diff --git a/inquisitor/importer.py b/inquisitor/importer.py index ba26905..da6684b 100644 --- a/inquisitor/importer.py +++ b/inquisitor/importer.py @@ -12,7 +12,7 @@ def update_sources(*source_names): for source_name in source_names: try: source_module = load_source(source_name) - except Exception as e: + except Exception: error.as_item("Error importing source '{}'".format(source_name), traceback.format_exc()) continue @@ -24,7 +24,7 @@ def update_sources(*source_names): state_path = os.path.join(cell_path, "state") with open(state_path, 'w', encoding='utf8') as f: f.write(json.dumps({})) - except Exception as e: + except Exception: error.as_item("Error initializing source '{}'".format(source_name), traceback.format_exc()) continue @@ -34,7 +34,7 @@ def update_sources(*source_names): logger.info("{} new item{}, {} deleted item{}".format( new_count, "s" if new_count != 1 else "", del_count, "s" if del_count != 1 else "")) - except Exception as e: + except Exception: error.as_item("Error updating source '{}'".format(source_name), traceback.format_exc()) def load_source(source_name): @@ -58,103 +58,112 @@ def load_source(source_name): raise ImportError("Missing fetch_new in '{}'".format(source_file_name)) # Since the source is valid, get or create the source cell. os.chdir(cwd) - cell_path = os.path.join(DUNGEON_PATH, source_name) return itemsource def update_source(source_name, fetch_new): """ Attempts to update the given source. Raises an exception if the source does. """ - cell_path = os.path.join(DUNGEON_PATH, source_name) - - # Get the existing items. + # Get the existing items from the source's cell. prior_items, errors = loader.load_items(source_name) + if any(errors): + raise Exception(f'Can\'t update source "{source_name}", some items are corrupt') logger.debug("Found {} prior items".format(len(prior_items))) - em = "Initial items: {}\n".format(" ".join(list(prior_items.keys()))) - em += "Errors: {}\n".format(" ".join(errors)) - # Get the new items. + # Get the feed items from the source's fetch method. state = loader.load_state(source_name) - new_items = fetch_new(state) - logger.debug("Fetched {} items".format(len(new_items))) + fetched = fetch_new(state) + fetched_items = {item['id']: item for item in fetched} state.flush() - em += "New items: {}\n".format(" ".join([ni['id'] for ni in new_items])) - new_count = 0 - del_count = 0 + # Populate all the fetched items with required or auto-generated fields. + # This also provides an opportunity to throw if the source isn't returning + # valid items. + for item in fetched_items.values(): + populate_new(source_name, item) + logger.debug("Fetched {} items".format(len(fetched_items))) + + # Write all the new fetched items to the source's cell. + new_items = [ + item for item in fetched_items.values() + if item['id'] not in prior_items] for item in new_items: - populate_new(item) + s = json.dumps(item) + path = os.path.join(DUNGEON_PATH, item['source'], item['id'] + ".item") + with open(path, 'w', encoding='utf8') as f: + f.write(s) - if item['id'] not in prior_items: - # If the item is new, write it. - new_count += 1 - s = json.dumps(item) - path = os.path.join(DUNGEON_PATH, item['source'], item['id'] + ".item") - with open(path, 'w', encoding="utf8") as f: - f.write(s) - em += "Initialized item {}\n".format(item['id']) + # Update the extant items using the fetched item's values. + extant_items = [ + item for item in fetched_items.values() + if item['id'] in prior_items] + for item in extant_items: + # The items in prior_items are writethrough dicts. + prior_item = prior_items[item['id']] + # Only bother updating active items. + if prior_item['active']: + populate_old(prior_item, item) - else: - # If the item is extant and still active, overwrite its values. - prior_item = prior_items[item['id']] - if prior_item['active']: - populate_old(prior_item, item) - em += "Updated item {}\n".format(item['id']) - # Remove the id from the list to track its continued presence - # in the source's queue of new items. - del prior_items[item['id']] - em += "Checked off item {}\n".format(item['id']) - - em += "Remaining items: {}\n".format(" ".join(list(prior_items.keys()))) - em += "ls dir: {}\n".format(" ".join(list(os.listdir(os.path.join(DUNGEON_PATH, source_name))))) - # Any remaining extant items are considered old. Old items are removed - # when they are both inactive and past their ttl date. + # In general, items are removed when they are old (not found in the last + # fetch) and inactive. Some item fields can change this basic behavior. + del_count = 0 now = timestamp.now() - for prior_id, prior_item in prior_items.items(): - if 'created' in prior_item and 'ttl' in prior_item: - ttl_date = prior_item['created'] + prior_item['ttl'] - else: - ttl_date = 0 - if not prior_item['active'] and ttl_date < now: - if em is not None: em += "Deleting item {}\n".format(prior_id) + old_items = [ + item for item in prior_items.values() + if item['id'] not in fetched_items] + for item in old_items: + remove = not item['active'] + # The time-to-live field protects an item from removal until expiry. + # This is mainly used to avoid old items resurfacing when their source + # cannot guarantee monotonicity. + if 'ttl' in item: + ttl_date = item['created'] + item['ttl'] + if ttl_date > now: + continue + # The time-to-die field can force an active item to be removed. + if 'ttd' in item: + ttd_date = item['created'] + item['ttd'] + if ttd_date < now: + remove = True + # Items to be removed are deleted + if remove: del_count += 1 - file_path = os.path.join(DUNGEON_PATH, prior_item['source'], prior_item['id'] + ".item") + file_path = os.path.join(DUNGEON_PATH, item['source'], item['id'] + ".item") try: os.remove(file_path) except: - if em is not None: - em += traceback.format_exc() - em += "ls dir: {}\n".format(list(os.listdir(os.path.join(DUNGEON_PATH, prior_item['source'])))) - error.as_item("Failed to delete {}".format(file_path), em) - em = None + error.as_item("Failed to delete {}".format(file_path)) # Note update timestamp in state state['last_updated'] = timestamp.now() # Return counts - return new_count, del_count + return len(new_items), del_count -def populate_new(item): - # id and source are required fields +def populate_new(source_name, item): + # id is required + if 'id' not in item: + raise Exception(f'Source "{source_name}" returned an item with no id') + # source is auto-populated with the source name if missing + if 'source' not in item: item['source'] = source_name + # active is forced to True for new items item['active'] = True - if 'created' not in item: item['created'] = timestamp.now() + # created is forced to the current timestamp + item['created'] = timestamp.now() + # title is auto-populated with the id if missing if 'title' not in item: item['title'] = item['id'] - if 'link' not in item: item['link'] = None - if 'time' not in item: item['time'] = None - if 'author' not in item: item['author'] = None - if 'body' not in item: item['body'] = None - tags = item['tags'] if 'tags' in item else [] - if item['source'] not in tags: tags.insert(0, item['source']) - item['tags'] = tags - if 'ttl' not in item: item['ttl'] = 0 + # tags is auto-populated if missing (not if empty!) + if 'tags' not in item: item['tags'] = [source_name] + # link, time, author, body, ttl, ttd, and tts are optional def populate_old(prior, new): - prior.set({ - 'title': new['title'], - 'link': new['link'], - 'time': new['time'], - 'author': new['author'], - 'body': new['body'], - 'tags': new['tags'], - 'ttl': new['ttl'], - }) + # Not updated: id, source, active, created + if 'title' in new: prior['title'] = new['title'] + if 'tags' in new: prior['tags'] = new['tags'] + if 'link' in new: prior['link'] = new['link'] + if 'time' in new: prior['time'] = new['time'] + if 'author' in new: prior['author'] = new['author'] + if 'body' in new: prior['body'] = new['body'] + if 'ttl' in new: prior['ttl'] = new['ttl'] + if 'ttd' in new: prior['ttd'] = new['ttd'] + if 'tts' in new: prior['tts'] = new['tts']