diff --git a/inquisitor/app.py b/inquisitor/app.py index 4c585ab..e33da13 100644 --- a/inquisitor/app.py +++ b/inquisitor/app.py @@ -106,8 +106,7 @@ def deactivate(): params = request.get_json() if 'source' not in params and 'itemid' not in params: logger.error("Bad request params: {}".format(params)) - item = loader.WritethroughDict(os.path.join( - DUNGEON_PATH, params['source'], params['itemid'] + '.item')) + item = loader.load_item(params['source'], params['itemid']) if item['active']: logger.debug(f"Deactivating {params['source']}/{params['itemid']}") item['active'] = False @@ -118,8 +117,7 @@ def punt(): params = request.get_json() if 'source' not in params and 'itemid' not in params: logger.error("Bad request params: {}".format(params)) - item = loader.WritethroughDict(os.path.join( - DUNGEON_PATH, params['source'], params['itemid'] + '.item')) + item = loader.load_item(params['source'], params['itemid']) tomorrow = datetime.now() + timedelta(days=1) morning = datetime(tomorrow.year, tomorrow.month, tomorrow.day, 6, 0, 0) til_then = morning.timestamp() - item['created'] @@ -134,8 +132,7 @@ def mass_deactivate(): for info in params.get('items', []): source = info['source'] itemid = info['itemid'] - item = loader.WritethroughDict(os.path.join( - DUNGEON_PATH, source, itemid + ".item")) + item = loader.load_item(source, itemid) if item['active']: logger.debug(f"Deactivating {info['source']}/{info['itemid']}") item['active'] = False diff --git a/inquisitor/loader.py b/inquisitor/loader.py index ab65a9f..b7e4129 100644 --- a/inquisitor/loader.py +++ b/inquisitor/loader.py @@ -1,18 +1,42 @@ import os import json + from inquisitor.configs import DUNGEON_PATH from inquisitor import error from inquisitor import timestamp + class WritethroughDict(): - """A wrapper for a dictionary saved to the disk.""" - def __init__(self, path): + """A wrapper for a dictionary saved to the file system.""" + + @staticmethod + def create(path, item): + """ + Creates a writethrough dictionary from a dictionary in memory and + initializes a file to save it. + """ + if os.path.isfile(path): + raise FileExistsError(path) + wd = WritethroughDict(path, item) + wd.flush() + return wd + + @staticmethod + def load(path): + """ + Creates a writethrough dictionary from an existing file in the + file system. + """ if not os.path.isfile(path): raise FileNotFoundError(path) - self.path = path with open(path) as f: - self.item = json.loads(f.read()) + item = json.load(f) + return WritethroughDict(path, item) + + def __init__(self, path, item): + self.path = path + self.item = item def __getitem__(self, key): return self.item[key] @@ -40,10 +64,17 @@ class WritethroughDict(): with open(self.path, 'w', encoding="utf8") as f: f.write(s) + def load_state(source_name): """Loads the state dictionary for a source.""" state_path = os.path.join(DUNGEON_PATH, source_name, "state") - return WritethroughDict(state_path) + return WritethroughDict.load(state_path) + + +def load_item(source_name, item_id): + """Loads an item from a source.""" + item_path = os.path.join(DUNGEON_PATH, source_name, f'{item_id}.item') + return WritethroughDict.load(item_path) def load_items(source_name): """ @@ -55,8 +86,7 @@ def load_items(source_name): for filename in os.listdir(cell_path): if filename.endswith('.item'): try: - path = os.path.join(cell_path, filename) - item = WritethroughDict(path) + item = load_item(source_name, filename[:-5]) items[item['id']] = item except Exception: errors.append(filename) @@ -74,8 +104,7 @@ def load_active_items(): for filename in os.listdir(cell_path): if filename.endswith('.item'): try: - path = os.path.join(cell_path, filename) - item = WritethroughDict(path) + item = load_item(cell_name, filename[:-5]) # The time-to-show field hides items until an expiry date. if 'tts' in item: tts_date = item['created'] + item['tts'] diff --git a/inquisitor/sources.py b/inquisitor/sources.py index 78b7ba2..dd50229 100644 --- a/inquisitor/sources.py +++ b/inquisitor/sources.py @@ -208,7 +208,7 @@ def item_callback(source_name, itemid): raise ImportError(f"Missing callback in '{source_name}'") # Load the source state and the origin item state = loader.load_state(source_name) - item = loader.WritethroughDict(os.path.join(DUNGEON_PATH, source_name, itemid + ".item")) + item = loader.load_item(source_name, itemid) # Execute callback source_module.callback(state, item) # Save any changes