Add support and CLI for actions

This commit is contained in:
Tim Van Baak 2023-05-31 20:50:56 -07:00
parent 48efb2d3cd
commit d9d383b138
5 changed files with 177 additions and 11 deletions

View File

@ -52,4 +52,4 @@ Each line written to the process's `stdout` will be parsed as a JSON object repr
If invalid JSON is written, intake will consider the feed update to be a failure. If the exit code is nonzero, intake will consider the feed update to be a failure, even if valid JSON was received. No changes will happen to the feed state as a result of a failed update.
Item actions are performed by executing `action.<name>.exe` with `action.<name>.args` as arguments.
Item actions are performed by executing `action.<name>.exe` with `action.<name>.args` as arguments. The process will receive the item, serialized as JSON, on the first line of `stdin`. The process should write the item back to `stdout` as a single line of JSON with any updates from the action.

View File

@ -8,7 +8,7 @@ import os.path
import subprocess
import sys
from intake.source import fetch_items, LocalSource, update_items
from intake.source import fetch_items, LocalSource, update_items, execute_action
from intake.types import InvalidConfigException, SourceUpdateException
@ -52,14 +52,18 @@ def cmd_edit(cmd_args):
return 0
source_path.mkdir()
with (source_path / "intake.json").open("w") as f:
json.dump({
"fetch": {
"exe": "",
"args": [],
json.dump(
{
"fetch": {
"exe": "",
"args": [],
},
"actions": {},
"env": {},
},
"actions": {},
"env": {},
}, f, indent=2)
f,
indent=2,
)
# Make a copy of the config
source = LocalSource(data, args.source)
@ -131,6 +135,61 @@ def cmd_update(cmd_args):
return 0
def cmd_action(cmd_args):
"""Execute an action for an item."""
parser = argparse.ArgumentParser(
prog="intake action",
description=cmd_action.__doc__,
)
parser.add_argument(
"--data",
"-d",
default=intake_data_dir(),
help="Path to the intake data directory containing source directories",
)
parser.add_argument(
"--source",
"-s",
required=True,
help="Source name to fetch",
)
parser.add_argument(
"--item",
"-i",
required=True,
help="Item id to perform the action with",
)
parser.add_argument(
"--action",
"-a",
required=True,
help="Action to perform",
)
args = parser.parse_args(cmd_args)
source = LocalSource(Path(args.data), args.source)
try:
item = execute_action(source, args.item, args.action, 5)
print("Item:", item)
except InvalidConfigException as ex:
print("Could not fetch", args.source)
print(ex)
return 1
except SourceUpdateException as ex:
print(
"Error executing source",
args.source,
"item",
args.item,
"action",
args.action,
)
print(ex)
return 1
return 0
def cmd_feed(cmd_args):
"""Print the current feed."""
parser = argparse.ArgumentParser(
@ -160,8 +219,12 @@ def cmd_feed(cmd_args):
if not args.sources:
args.sources = [child.name for child in data.iterdir()]
sources = [LocalSource(data, name) for name in args.sources if (data / name / "intake.json").exists()]
items = [item for source in sources for item in source.get_all_items() ]
sources = [
LocalSource(data, name)
for name in args.sources
if (data / name / "intake.json").exists()
]
items = [item for source in sources for item in source.get_all_items()]
if not items:
print("Feed is empty")

View File

@ -166,6 +166,78 @@ def fetch_items(source: LocalSource, update_timeout=60):
return items
def execute_action(source: LocalSource, item_id: str, action: str, action_timeout=60):
"""
Execute the action for a feed source.
"""
# Load the item
item = source.get_item(item_id)
# Load the source's config
config = source.get_config()
actions = config.get("actions", {})
if action not in actions:
raise InvalidConfigException(f"Missing action {action}")
exe_name = config["actions"][action]["exe"]
exe_args = config["actions"][action].get("args", [])
# Overlay the current env with the config env and intake-provided values
exe_env = {
**os.environ.copy(),
**config.get("env", {}),
"STATE_PATH": str(source.get_state_path()),
}
# Launch the action command
try:
process = Popen(
[exe_name, *exe_args],
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
cwd=source.source_path,
env=exe_env,
encoding="utf8",
)
except PermissionError:
raise SourceUpdateException("command not executable")
# While the update command is executing, watch its output
t_stderr = Thread(target=read_stderr, args=(process,), daemon=True)
t_stderr.start()
outs = []
t_stdout = Thread(target=read_stdout, args=(process, outs), daemon=True)
t_stdout.start()
# Send the item to the process
process.stdin.write(json.dumps(item))
process.stdin.write("\n")
process.stdin.flush()
# Time out the process if it takes too long
try:
process.wait(timeout=action_timeout)
except TimeoutExpired:
process.kill()
t_stdout.join(timeout=1)
t_stderr.join(timeout=1)
if process.poll():
raise SourceUpdateException("return code")
if not outs:
raise SourceUpdateException("no item")
try:
item = json.loads(outs[0])
source.save_item(item)
return item
except json.JSONDecodeError:
raise SourceUpdateException("invalid json")
def update_items(source: LocalSource, fetched_items):
"""
Update the source with a batch of new items, doing creations, updates, and

View File

@ -0,0 +1,19 @@
#!/usr/bin/env python3
import argparse, json, sys
parser = argparse.ArgumentParser()
parser.add_argument("action")
args = parser.parse_args()
print("args:", args, file=sys.stderr, flush=True)
if args.action == "fetch":
print(json.dumps({"id": "caller", "action": {"value": 1}}))
if args.action == "increment":
item = sys.stdin.readline()
item = json.loads(item)
item["action"]["value"] += 1
print(json.dumps(item))
pass

View File

@ -0,0 +1,12 @@
{
"fetch": {
"exe": "./increment.py",
"args": ["fetch"]
},
"actions": {
"increment": {
"exe": "./increment.py",
"args": ["increment"]
}
}
}