From d9d383b138a1f4a85996be2484a74574e50aea0b Mon Sep 17 00:00:00 2001 From: Tim Van Baak Date: Wed, 31 May 2023 20:50:56 -0700 Subject: [PATCH] Add support and CLI for actions --- README.md | 2 +- intake/cli.py | 83 ++++++++++++++++++++++---- intake/source.py | 72 ++++++++++++++++++++++ tests/demo_basic_callback/increment.py | 19 ++++++ tests/demo_basic_callback/intake.json | 12 ++++ 5 files changed, 177 insertions(+), 11 deletions(-) create mode 100755 tests/demo_basic_callback/increment.py create mode 100644 tests/demo_basic_callback/intake.json diff --git a/README.md b/README.md index 5f966a9..586ead5 100644 --- a/README.md +++ b/README.md @@ -52,4 +52,4 @@ Each line written to the process's `stdout` will be parsed as a JSON object repr If invalid JSON is written, intake will consider the feed update to be a failure. If the exit code is nonzero, intake will consider the feed update to be a failure, even if valid JSON was received. No changes will happen to the feed state as a result of a failed update. -Item actions are performed by executing `action..exe` with `action..args` as arguments. \ No newline at end of file +Item actions are performed by executing `action..exe` with `action..args` as arguments. The process will receive the item, serialized as JSON, on the first line of `stdin`. The process should write the item back to `stdout` as a single line of JSON with any updates from the action. diff --git a/intake/cli.py b/intake/cli.py index 6ddcfb9..c2852fe 100644 --- a/intake/cli.py +++ b/intake/cli.py @@ -8,7 +8,7 @@ import os.path import subprocess import sys -from intake.source import fetch_items, LocalSource, update_items +from intake.source import fetch_items, LocalSource, update_items, execute_action from intake.types import InvalidConfigException, SourceUpdateException @@ -52,14 +52,18 @@ def cmd_edit(cmd_args): return 0 source_path.mkdir() with (source_path / "intake.json").open("w") as f: - json.dump({ - "fetch": { - "exe": "", - "args": [], + json.dump( + { + "fetch": { + "exe": "", + "args": [], + }, + "actions": {}, + "env": {}, }, - "actions": {}, - "env": {}, - }, f, indent=2) + f, + indent=2, + ) # Make a copy of the config source = LocalSource(data, args.source) @@ -131,6 +135,61 @@ def cmd_update(cmd_args): return 0 +def cmd_action(cmd_args): + """Execute an action for an item.""" + parser = argparse.ArgumentParser( + prog="intake action", + description=cmd_action.__doc__, + ) + parser.add_argument( + "--data", + "-d", + default=intake_data_dir(), + help="Path to the intake data directory containing source directories", + ) + parser.add_argument( + "--source", + "-s", + required=True, + help="Source name to fetch", + ) + parser.add_argument( + "--item", + "-i", + required=True, + help="Item id to perform the action with", + ) + parser.add_argument( + "--action", + "-a", + required=True, + help="Action to perform", + ) + args = parser.parse_args(cmd_args) + + source = LocalSource(Path(args.data), args.source) + try: + item = execute_action(source, args.item, args.action, 5) + print("Item:", item) + except InvalidConfigException as ex: + print("Could not fetch", args.source) + print(ex) + return 1 + except SourceUpdateException as ex: + print( + "Error executing source", + args.source, + "item", + args.item, + "action", + args.action, + ) + print(ex) + return 1 + + return 0 + + def cmd_feed(cmd_args): """Print the current feed.""" parser = argparse.ArgumentParser( @@ -160,8 +219,12 @@ def cmd_feed(cmd_args): if not args.sources: args.sources = [child.name for child in data.iterdir()] - sources = [LocalSource(data, name) for name in args.sources if (data / name / "intake.json").exists()] - items = [item for source in sources for item in source.get_all_items() ] + sources = [ + LocalSource(data, name) + for name in args.sources + if (data / name / "intake.json").exists() + ] + items = [item for source in sources for item in source.get_all_items()] if not items: print("Feed is empty") diff --git a/intake/source.py b/intake/source.py index 804fc2d..919e071 100755 --- a/intake/source.py +++ b/intake/source.py @@ -166,6 +166,78 @@ def fetch_items(source: LocalSource, update_timeout=60): return items +def execute_action(source: LocalSource, item_id: str, action: str, action_timeout=60): + """ + Execute the action for a feed source. + """ + # Load the item + item = source.get_item(item_id) + + # Load the source's config + config = source.get_config() + + actions = config.get("actions", {}) + if action not in actions: + raise InvalidConfigException(f"Missing action {action}") + + exe_name = config["actions"][action]["exe"] + exe_args = config["actions"][action].get("args", []) + + # Overlay the current env with the config env and intake-provided values + exe_env = { + **os.environ.copy(), + **config.get("env", {}), + "STATE_PATH": str(source.get_state_path()), + } + + # Launch the action command + try: + process = Popen( + [exe_name, *exe_args], + stdin=PIPE, + stdout=PIPE, + stderr=PIPE, + cwd=source.source_path, + env=exe_env, + encoding="utf8", + ) + except PermissionError: + raise SourceUpdateException("command not executable") + + # While the update command is executing, watch its output + t_stderr = Thread(target=read_stderr, args=(process,), daemon=True) + t_stderr.start() + + outs = [] + t_stdout = Thread(target=read_stdout, args=(process, outs), daemon=True) + t_stdout.start() + + # Send the item to the process + process.stdin.write(json.dumps(item)) + process.stdin.write("\n") + process.stdin.flush() + + # Time out the process if it takes too long + try: + process.wait(timeout=action_timeout) + except TimeoutExpired: + process.kill() + t_stdout.join(timeout=1) + t_stderr.join(timeout=1) + + if process.poll(): + raise SourceUpdateException("return code") + + if not outs: + raise SourceUpdateException("no item") + try: + item = json.loads(outs[0]) + source.save_item(item) + return item + except json.JSONDecodeError: + raise SourceUpdateException("invalid json") + + def update_items(source: LocalSource, fetched_items): """ Update the source with a batch of new items, doing creations, updates, and diff --git a/tests/demo_basic_callback/increment.py b/tests/demo_basic_callback/increment.py new file mode 100755 index 0000000..619c9f8 --- /dev/null +++ b/tests/demo_basic_callback/increment.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 + +import argparse, json, sys + +parser = argparse.ArgumentParser() +parser.add_argument("action") +args = parser.parse_args() + +print("args:", args, file=sys.stderr, flush=True) + +if args.action == "fetch": + print(json.dumps({"id": "caller", "action": {"value": 1}})) + +if args.action == "increment": + item = sys.stdin.readline() + item = json.loads(item) + item["action"]["value"] += 1 + print(json.dumps(item)) + pass diff --git a/tests/demo_basic_callback/intake.json b/tests/demo_basic_callback/intake.json new file mode 100644 index 0000000..2922077 --- /dev/null +++ b/tests/demo_basic_callback/intake.json @@ -0,0 +1,12 @@ +{ + "fetch": { + "exe": "./increment.py", + "args": ["fetch"] + }, + "actions": { + "increment": { + "exe": "./increment.py", + "args": ["increment"] + } + } +} \ No newline at end of file