From fc583186dd916e724a8db19f53d62ea2724650ec Mon Sep 17 00:00:00 2001 From: Tim Van Baak Date: Mon, 29 May 2023 16:19:29 -0700 Subject: [PATCH] Add update logic --- intake/__init__.py | 1 - intake/cli.py | 26 ++++++--- intake/source.py | 143 ++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 153 insertions(+), 17 deletions(-) diff --git a/intake/__init__.py b/intake/__init__.py index 6b75871..e69de29 100644 --- a/intake/__init__.py +++ b/intake/__init__.py @@ -1 +0,0 @@ -HELLO = "WORLD" diff --git a/intake/cli.py b/intake/cli.py index d25da33..315a56c 100644 --- a/intake/cli.py +++ b/intake/cli.py @@ -4,7 +4,7 @@ import os import os.path import sys -from .source import fetch_items +from .source import fetch_items, LocalSource, update_items from .types import InvalidConfigException, SourceUpdateException @@ -15,11 +15,11 @@ def intake_data_dir() -> Path: return intake_data -def cmd_fetch(cmd_args): - """Execute the fetch for a source.""" +def cmd_update(cmd_args): + """Fetch items for a source and update it.""" parser = argparse.ArgumentParser( - prog="intake fetch", - description=cmd_fetch.__doc__, + prog="intake update", + description=cmd_update.__doc__, ) parser.add_argument( "--base", @@ -30,14 +30,22 @@ def cmd_fetch(cmd_args): "--source", help="Source name to fetch", ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Instead of updating the source, print the fetched items" + ) args = parser.parse_args(cmd_args) ret = 0 - source_path = Path(args.base) / args.source + source = LocalSource(Path(args.base), args.source) try: - items = fetch_items(source_path) - for item in items: - print("Item:", item) + items = fetch_items(source) + if not args.dry_run: + update_items(source, items) + else: + for item in items: + print("Item:", item) except InvalidConfigException as ex: print("Could not fetch", args.source) print(ex) diff --git a/intake/source.py b/intake/source.py index 6a425bc..0797609 100755 --- a/intake/source.py +++ b/intake/source.py @@ -1,13 +1,72 @@ from pathlib import Path from subprocess import Popen, PIPE, TimeoutExpired from threading import Thread +from typing import List import json import os import os.path +import time from .types import InvalidConfigException, SourceUpdateException +class LocalSource: + """ + An intake source backed by a filesystem directory. + """ + def __init__(self, data_path: Path, source_name: str): + self.data_path: Path = data_path + self.source_name = source_name + self.source_path: Path = data_path / source_name + + def get_config(self) -> dict: + config_path = self.source_path / "intake.json" + with open(config_path, "r", encoding="utf8") as config_file: + return json.load(config_file) + + def get_state_path(self) -> Path: + return (self.source_path / "state").absolute() + + def get_item_path(self, item_id: dict) -> Path: + return self.source_path / f"{item_id}.item" + + def get_item_ids(self) -> List[str]: + return [ + filepath.name[:-5] + for filepath in self.source_path.iterdir() + if filepath.name.endswith(".item")] + + def item_exists(self, item_id) -> bool: + return self.get_item_path(item_id).exists() + + def new_item(self, item: dict) -> dict: + # Ensure required fields + if "id" not in item: + raise KeyError("id") + item["source"] = self.source_name + item["active"] = True + item["created"] = int(time.time()) + item["title"] = item.get("title", item["id"]) + item["tags"] = item.get("tags", [self.source_name]) + # All other fields are optiona + self.save_item(item) + return item + + def get_item(self, item_id: str) -> dict: + with self.get_item_path(item_id).open() as f: + return json.load(f) + + def save_item(self, item: dict) -> None: + # Write to a tempfile first to avoid losing the item on write failure + tmp_path = self.source_path / f"{item['id']}.item.tmp" + with tmp_path.open("w") as f: + f.write(json.dumps(item, indent=2)) + os.rename(tmp_path, self.get_item_path(item['id'])) + + def delete_item(self, item_id) -> None: + os.remove(self.get_item_path(item_id)) + + def read_stdout(process: Popen, outs: list): """ Read the subprocess's stdout into memory. @@ -35,19 +94,17 @@ def read_stderr(process: Popen): break -def fetch_items(source_path: Path, update_timeout=60): +def fetch_items(source: LocalSource, update_timeout=60): """ Execute the feed source and return the current feed items. Returns a list of feed items on success. Throws SourceUpdateException if the feed source update failed. """ # Load the source's config to get its update command - config_path = source_path / "intake.json" - with open(config_path, "r", encoding="utf8") as config_file: - config = json.load(config_file) + config = source.get_config() if "fetch" not in config: - raise InvalidConfigException("Missing exe") + raise InvalidConfigException("Missing fetch") exe_name = config["fetch"]["exe"] exe_args = config["fetch"].get("args", []) @@ -56,7 +113,7 @@ def fetch_items(source_path: Path, update_timeout=60): exe_env = { **os.environ.copy(), **config.get("env", {}), - "STATE_PATH": str((source_path / "state").absolute()), + "STATE_PATH": str(source.get_state_path()), } # Launch the update command @@ -65,7 +122,7 @@ def fetch_items(source_path: Path, update_timeout=60): [exe_name, *exe_args], stdout=PIPE, stderr=PIPE, - cwd=source_path, + cwd=source.source_path, env=exe_env, encoding="utf8", ) @@ -100,3 +157,75 @@ def fetch_items(source_path: Path, update_timeout=60): raise SourceUpdateException("invalid json") return items + + +def update_items(source: LocalSource, fetched_items): + """ + Update the source with a batch of new items, doing creations, updates, and + deletions as necessary. + """ + # Get a list of item ids that already existed for this source. + prior_ids = source.get_item_ids() + print(f'Found {len(prior_ids)} prior items') + + # Determine which items are new and which are updates. + new_items = [] + upd_items = [] + for item in fetched_items: + if source.item_exists(item["id"]): + upd_items.append(item) + else: + new_items.append(item) + + # Write all the new items to the source directory. + for item in new_items: + # TODO: support on-create trigger + source.new_item(item) + + # Update the other items using the fetched items' values. + for upd_item in upd_items: + old_item = source.get_item(upd_item["id"]) + for field in ("title", "tags", "link", "time", "author", "body", "ttl", "ttd", "tts"): + if field in upd_item and old_item[field] != upd_item[field]: + old_item[field] = upd_item[field] + if "callback" in upd_item: + # Because of the way this update happens, any fields that are set + # in the callback when the item is new will keep their original + # values, as those values reappear in new_item on subsequent + # updates. + old_item['callback'] = {**old_item['callback'], **upd_item['callback']} + + # Items are removed when they are old (not in the latest fetch) and + # inactive. Some item fields change this basic behavior. + del_count = 0 + now = int(time.time()) + upd_ids = [item["id"] for item in upd_items] + old_item_ids = [ + item_id for item_id in prior_ids + if item_id not in upd_ids] + + for item_id in old_item_ids: + item = source.get_item(item_id) + remove = not item["active"] + + # The time-to-live field protects an item from removal until expiry. + # This is mainly used to avoid old items resurfacing when their source + # cannot guarantee monotonicity. + if "ttl" in item: + ttl_date = item["created"] + item["ttl"] + if ttl_date > now: + continue + + # The time-to-die field puts a maximum lifespan on an item, removing it + # even if it is active. + if "ttd" in item: + ttd_date = item["created"] + item["ttd"] + if ttd_date < now: + remove = True + + # Items to be removed are deleted. + if remove: + source.delete_item(item["id"]) + del_count += 1 + + print(len(new_items), "new,", del_count, "deleted")