From 1b16a48f31ff86f465f34680744aedd91735021d Mon Sep 17 00:00:00 2001 From: Tim Van Baak Date: Thu, 1 Jun 2023 16:41:50 -0700 Subject: [PATCH] Refactor fetch into a special-case action --- README.md | 21 ++-- intake/cli.py | 2 +- intake/source.py | 140 ++++++++++--------------- tests/demo_basic_callback/increment.py | 9 +- tests/demo_basic_callback/intake.json | 10 +- tests/demo_logging/intake.json | 8 +- tests/demo_raw_sh/intake.json | 14 +-- 7 files changed, 92 insertions(+), 112 deletions(-) diff --git a/README.md b/README.md index 586ead5..9297ddb 100644 --- a/README.md +++ b/README.md @@ -27,11 +27,11 @@ intake ``` { - "fetch": { - "exe": "", - "args": ["list", "of", "program", "arguments"] - }, "action": { + "fetch": { + "exe": "", + "args": ["list", "of", "program", "arguments"] + }, "": { "exe": "...", "args": "..." @@ -41,15 +41,18 @@ intake } ``` -`fetch` is required. If `action` or `env` are absent, they will be treated as if they were empty. +Each key under `action` defines an action that can be taken for the source. The `fetch` action is required. `env` is optional. Each key under `env` will be set as an environment variable when executing actions. -When a feed source is updated, `fetch.exe` will be executed with `fetch.args` as arguments. The following environment variables will be set: +When an action is executed, intake executes the `exe` program for the action with the corresponding `args` as arguments. The process's environment is as follows: +* intake's environment is inherited. * `STATE_PATH` is set to the absolute path of `state`. * Each key in `env` in `config.json` is passed with its value. -Each line written to the process's `stdout` will be parsed as a JSON object representing a feed item. Each line written to `stderr` will be logged by intake. `stdout` and `stderr` are decoded as UTF-8. +Anything written to `stderr` by the process will be logged by intake. -If invalid JSON is written, intake will consider the feed update to be a failure. If the exit code is nonzero, intake will consider the feed update to be a failure, even if valid JSON was received. No changes will happen to the feed state as a result of a failed update. +The `fetch` action is used to fetch the current state of the feed source. It receives no input and should write feed items to `stdout` as JSON objects, each on one line. All other actions are taken in the context of a single item. These actions receive the item as a JSON object on the first line of `stdin`. The process should write the item back to `stdout` with any changes as a result of the action. -Item actions are performed by executing `action..exe` with `action..args` as arguments. The process will receive the item, serialized as JSON, on the first line of `stdin`. The process should write the item back to `stdout` as a single line of JSON with any updates from the action. +An item must have a key under action` with that action's name to support executing that action for that item. + +All encoding is done with UTF-8. If an item cannot be parsed or the exit code of the process is nonzero, intake will consider the action to be a failure. No items or other feed changes will happen as a result of a failed action, except for changes to `state` done by the action process. diff --git a/intake/cli.py b/intake/cli.py index c2852fe..1f5ed97 100644 --- a/intake/cli.py +++ b/intake/cli.py @@ -58,7 +58,7 @@ def cmd_edit(cmd_args): "exe": "", "args": [], }, - "actions": {}, + "action": {}, "env": {}, }, f, diff --git a/intake/source.py b/intake/source.py index 919e071..5ac5054 100755 --- a/intake/source.py +++ b/intake/source.py @@ -1,3 +1,4 @@ +from datetime import timedelta from pathlib import Path from subprocess import Popen, PIPE, TimeoutExpired from threading import Thread @@ -74,7 +75,7 @@ class LocalSource: yield json.loads(filepath.read_text(encoding="utf8")) -def read_stdout(process: Popen, outs: list): +def read_stdout(process: Popen, output: list): """ Read the subprocess's stdout into memory. This prevents the process from blocking when the pipe fills up. @@ -83,7 +84,7 @@ def read_stdout(process: Popen, outs: list): data = process.stdout.readline() if data: print(f"[stdout] <{repr(data)}>") - outs.append(data) + output.append(data) if process.poll() is not None: break @@ -101,62 +102,83 @@ def read_stderr(process: Popen): break -def fetch_items(source: LocalSource, update_timeout=60): +def execute_source_action( + source: LocalSource, action: str, input: str, timeout: timedelta +): """ - Execute the feed source and return the current feed items. - Returns a list of feed items on success. - Throws SourceUpdateException if the feed source update failed. + Execute the action from a given source. If stdin is specified, pass it + along to the process. """ - # Load the source's config to get its update command + # Gather the information necessary to launch the process config = source.get_config() + action_cfg = config.get("action", {}).get(action) - if "fetch" not in config: - raise InvalidConfigException("Missing fetch") + if not action_cfg: + raise InvalidConfigException(f"No such action {action}") + if "exe" not in action_cfg: + raise InvalidConfigException(f"No exe for action {action}") - exe_name = config["fetch"]["exe"] - exe_args = config["fetch"].get("args", []) - - # Overlay the current env with the config env and intake-provided values - exe_env = { + command = [action_cfg["exe"], *action_cfg.get("args", [])] + env = { **os.environ.copy(), **config.get("env", {}), "STATE_PATH": str(source.get_state_path()), } - # Launch the update command + # Launch the process try: process = Popen( - [exe_name, *exe_args], + command, + stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=source.source_path, - env=exe_env, + env=env, encoding="utf8", ) except PermissionError: - raise SourceUpdateException("command not executable") + raise SourceUpdateException(f"Command not executable: {''.join(command)}") - # While the update command is executing, watch its output - t_stderr = Thread(target=read_stderr, args=(process,), daemon=True) + # Kick off monitoring threads + output = [] + t_stdout: Thread = Thread(target=read_stdout, args=(process, output), daemon=True) + t_stdout.start() + t_stderr: Thread = Thread(target=read_stderr, args=(process,), daemon=True) t_stderr.start() - outs = [] - t_stdout = Thread(target=read_stdout, args=(process, outs), daemon=True) - t_stdout.start() + # Send input to the process, if provided + if input: + process.stdin.write(input) + if not input.endswith("\n"): + process.stdin.write("\n") + process.stdin.flush() - # Time out the process if it takes too long try: - process.wait(timeout=update_timeout) + process.wait(timeout=timeout.total_seconds()) except TimeoutExpired: process.kill() t_stdout.join(timeout=1) t_stderr.join(timeout=1) if process.poll(): - raise SourceUpdateException("return code") + raise SourceUpdateException( + f"{source.source_name} {action} failed with code {process.returncode}" + ) + return output + + +def fetch_items(source: LocalSource, timeout: int = 60): + """ + Execute the feed source and return the current feed items. + Returns a list of feed items on success. + Throws SourceUpdateException if the feed source update failed. + """ items = [] - for line in outs: + + output = execute_source_action(source, "fetch", None, timedelta(timeout)) + + for line in output: try: item = json.loads(line) items.append(item) @@ -166,72 +188,18 @@ def fetch_items(source: LocalSource, update_timeout=60): return items -def execute_action(source: LocalSource, item_id: str, action: str, action_timeout=60): +def execute_action(source: LocalSource, item_id: str, action: str, timeout: int = 60): """ Execute the action for a feed source. """ - # Load the item item = source.get_item(item_id) - # Load the source's config - config = source.get_config() - - actions = config.get("actions", {}) - if action not in actions: - raise InvalidConfigException(f"Missing action {action}") - - exe_name = config["actions"][action]["exe"] - exe_args = config["actions"][action].get("args", []) - - # Overlay the current env with the config env and intake-provided values - exe_env = { - **os.environ.copy(), - **config.get("env", {}), - "STATE_PATH": str(source.get_state_path()), - } - - # Launch the action command - try: - process = Popen( - [exe_name, *exe_args], - stdin=PIPE, - stdout=PIPE, - stderr=PIPE, - cwd=source.source_path, - env=exe_env, - encoding="utf8", - ) - except PermissionError: - raise SourceUpdateException("command not executable") - - # While the update command is executing, watch its output - t_stderr = Thread(target=read_stderr, args=(process,), daemon=True) - t_stderr.start() - - outs = [] - t_stdout = Thread(target=read_stdout, args=(process, outs), daemon=True) - t_stdout.start() - - # Send the item to the process - process.stdin.write(json.dumps(item)) - process.stdin.write("\n") - process.stdin.flush() - - # Time out the process if it takes too long - try: - process.wait(timeout=action_timeout) - except TimeoutExpired: - process.kill() - t_stdout.join(timeout=1) - t_stderr.join(timeout=1) - - if process.poll(): - raise SourceUpdateException("return code") - - if not outs: + output = execute_source_action(source, action, json.dumps(item), timedelta(timeout)) + if not output: raise SourceUpdateException("no item") + try: - item = json.loads(outs[0]) + item = json.loads(output[0]) source.save_item(item) return item except json.JSONDecodeError: diff --git a/tests/demo_basic_callback/increment.py b/tests/demo_basic_callback/increment.py index 619c9f8..56a4239 100755 --- a/tests/demo_basic_callback/increment.py +++ b/tests/demo_basic_callback/increment.py @@ -9,11 +9,16 @@ args = parser.parse_args() print("args:", args, file=sys.stderr, flush=True) if args.action == "fetch": - print(json.dumps({"id": "caller", "action": {"value": 1}})) + print(json.dumps({ + "id": "updateme", + "action": { + "increment": 1 + } + })) if args.action == "increment": item = sys.stdin.readline() item = json.loads(item) - item["action"]["value"] += 1 + item["action"]["increment"] += 1 print(json.dumps(item)) pass diff --git a/tests/demo_basic_callback/intake.json b/tests/demo_basic_callback/intake.json index 2922077..4f5074b 100644 --- a/tests/demo_basic_callback/intake.json +++ b/tests/demo_basic_callback/intake.json @@ -1,9 +1,9 @@ { - "fetch": { - "exe": "./increment.py", - "args": ["fetch"] - }, - "actions": { + "action": { + "fetch": { + "exe": "./increment.py", + "args": ["fetch"] + }, "increment": { "exe": "./increment.py", "args": ["increment"] diff --git a/tests/demo_logging/intake.json b/tests/demo_logging/intake.json index 1a9f6be..11935be 100644 --- a/tests/demo_logging/intake.json +++ b/tests/demo_logging/intake.json @@ -1,7 +1,9 @@ { - "fetch": { - "exe": "python3", - "args": ["update.py"] + "action": { + "fetch": { + "exe": "python3", + "args": ["update.py"] + } }, "env": { "HELLO": "WORLD" diff --git a/tests/demo_raw_sh/intake.json b/tests/demo_raw_sh/intake.json index 17cdb70..c93b584 100644 --- a/tests/demo_raw_sh/intake.json +++ b/tests/demo_raw_sh/intake.json @@ -1,9 +1,11 @@ { - "fetch": { - "exe": "sh", - "args": [ - "-c", - "echo {\\\"id\\\": \\\"$(date +%Y-%m-%d-%H-%M)\\\"}" - ] + "action": { + "fetch": { + "exe": "sh", + "args": [ + "-c", + "echo {\\\"id\\\": \\\"$(date +%Y-%m-%d-%H-%M)\\\"}" + ] + } } }