inquisitor/inquisitor/loader.py

187 lines
4.6 KiB
Python
Raw Normal View History

import os
import json
2020-08-06 22:38:24 +00:00
2020-12-29 01:46:57 +00:00
from inquisitor.configs import DUNGEON_PATH, logger
2020-06-02 18:49:02 +00:00
from inquisitor import error
2020-06-02 20:46:53 +00:00
from inquisitor import timestamp
2020-08-06 22:38:24 +00:00
class WritethroughDict():
2020-08-06 22:38:24 +00:00
"""A wrapper for a dictionary saved to the file system."""
@staticmethod
def create(path, item):
"""
Creates a writethrough dictionary from a dictionary in memory and
initializes a file to save it.
"""
if os.path.isfile(path):
raise FileExistsError(path)
wd = WritethroughDict(path, item)
wd.flush()
return wd
@staticmethod
def load(path):
"""
Creates a writethrough dictionary from an existing file in the
file system.
"""
if not os.path.isfile(path):
raise FileNotFoundError(path)
with open(path) as f:
2020-08-06 22:38:24 +00:00
item = json.load(f)
return WritethroughDict(path, item)
def __init__(self, path, item):
self.path = path
self.item = item
def __getitem__(self, key):
return self.item[key]
2020-08-07 06:48:10 +00:00
def get(self, *args, **kwargs):
return self.item.get(*args, **kwargs)
def __setitem__(self, key, value):
self.item[key] = value
self.flush()
def __contains__(self, key):
return key in self.item
def __repr__(self):
return repr(self.item)
def __str__(self):
return str(self.item)
def flush(self):
s = json.dumps(self.item, indent=2)
with open(self.path, 'w', encoding="utf8") as f:
f.write(s)
2020-08-06 22:38:24 +00:00
def load_state(source_name):
"""Loads the state dictionary for a source."""
state_path = os.path.join(DUNGEON_PATH, source_name, "state")
2020-08-06 22:38:24 +00:00
return WritethroughDict.load(state_path)
def load_item(source_name, item_id):
"""Loads an item from a source."""
item_path = os.path.join(DUNGEON_PATH, source_name, f'{item_id}.item')
return WritethroughDict.load(item_path)
2020-08-07 06:12:59 +00:00
def item_exists(source_name, item_id):
"""
Checks for the existence of an item.
"""
item_path = os.path.join(DUNGEON_PATH, source_name, f'{item_id}.item')
return os.path.isfile(item_path)
def get_item_ids(cell_name):
"""
Returns a list of item ids in the given cell.
"""
cell_path = os.path.join(DUNGEON_PATH, cell_name)
return [
filename[:-5]
for filename in os.listdir(cell_path)
if filename.endswith('.item')
]
def new_item(source_name, item):
"""
Creates a new item with the fields in the provided dictionary.
Initializes other fields to their default values.
"""
# id is required
if 'id' not in item:
raise Exception(f'Cannot create item with no id. Value = {item}')
# source must be filled in, so if it is absent it is auto-populated with
# source_name. Note: this allows sources to fill in a different source.
if 'source' not in item:
item['source'] = source_name
# active is forced to True for new items
item['active'] = True
# created is forced to the current timestamp
item['created'] = timestamp.now()
# title is auto-populated with the id if missing
if 'title' not in item:
item['title'] = item['id']
# tags is auto-populated if missing (not if empty!)
if 'tags' not in item:
item['tags'] = [source_name]
# All other fields are optional.
item_path = os.path.join(DUNGEON_PATH, item['source'], f'{item["id"]}.item')
return WritethroughDict.create(item_path, item)
2020-08-12 04:56:38 +00:00
def delete_item(source_name, item_id):
"""
Delete an item.
"""
item_path = os.path.join(DUNGEON_PATH, source_name, f'{item_id}.item')
os.remove(item_path)
def load_items(source_name):
"""
Returns a map of ids to items and a list of unreadable files.
"""
cell_path = os.path.join(DUNGEON_PATH, source_name)
items = {}
errors = []
for filename in os.listdir(cell_path):
2019-12-17 08:11:53 +00:00
if filename.endswith('.item'):
try:
2020-08-06 22:38:24 +00:00
item = load_item(source_name, filename[:-5])
2019-12-17 08:11:53 +00:00
items[item['id']] = item
2020-06-02 20:46:53 +00:00
except Exception:
2019-12-17 08:11:53 +00:00
errors.append(filename)
return items, errors
2020-12-29 01:46:57 +00:00
def load_active_items(source_names):
2019-12-17 08:11:53 +00:00
"""
2020-12-29 01:46:57 +00:00
Returns a list of active items and a list of unreadable items. If
`source_names` is defined, load only from sources in that list.
2019-12-17 08:11:53 +00:00
"""
items = []
errors = []
2020-06-02 20:46:53 +00:00
now = timestamp.now()
2020-12-29 01:46:57 +00:00
check_list = source_names or os.listdir(DUNGEON_PATH)
for source_name in check_list:
source_path = os.path.join(DUNGEON_PATH, source_name)
if not os.path.isdir(source_path):
logger.warning(f'Skipping nonexistent source {source_name}')
continue
for filename in os.listdir(source_path):
if not filename.endswith('.item'):
continue
try:
item = load_item(source_name, filename[:-5])
# The time-to-show field hides items until an expiry date.
if 'tts' in item:
tts_date = item['created'] + item['tts']
if now < tts_date:
2020-06-02 20:46:53 +00:00
continue
2020-12-29 01:46:57 +00:00
# Don't show inactive items
if not item['active']:
continue
items.append(item)
except Exception:
errors.append(filename)
2019-12-17 08:11:53 +00:00
return items, errors