Refactor fetch into a special-case action

This commit is contained in:
Tim Van Baak 2023-06-01 16:41:50 -07:00
parent d9d383b138
commit 1b16a48f31
7 changed files with 92 additions and 112 deletions

View File

@ -27,11 +27,11 @@ intake
``` ```
{ {
"action": {
"fetch": { "fetch": {
"exe": "<absolute path to program or name on intake's PATH>", "exe": "<absolute path to program or name on intake's PATH>",
"args": ["list", "of", "program", "arguments"] "args": ["list", "of", "program", "arguments"]
}, },
"action": {
"<action name>": { "<action name>": {
"exe": "...", "exe": "...",
"args": "..." "args": "..."
@ -41,15 +41,18 @@ intake
} }
``` ```
`fetch` is required. If `action` or `env` are absent, they will be treated as if they were empty. Each key under `action` defines an action that can be taken for the source. The `fetch` action is required. `env` is optional. Each key under `env` will be set as an environment variable when executing actions.
When a feed source is updated, `fetch.exe` will be executed with `fetch.args` as arguments. The following environment variables will be set: When an action is executed, intake executes the `exe` program for the action with the corresponding `args` as arguments. The process's environment is as follows:
* intake's environment is inherited.
* `STATE_PATH` is set to the absolute path of `state`. * `STATE_PATH` is set to the absolute path of `state`.
* Each key in `env` in `config.json` is passed with its value. * Each key in `env` in `config.json` is passed with its value.
Each line written to the process's `stdout` will be parsed as a JSON object representing a feed item. Each line written to `stderr` will be logged by intake. `stdout` and `stderr` are decoded as UTF-8. Anything written to `stderr` by the process will be logged by intake.
If invalid JSON is written, intake will consider the feed update to be a failure. If the exit code is nonzero, intake will consider the feed update to be a failure, even if valid JSON was received. No changes will happen to the feed state as a result of a failed update. The `fetch` action is used to fetch the current state of the feed source. It receives no input and should write feed items to `stdout` as JSON objects, each on one line. All other actions are taken in the context of a single item. These actions receive the item as a JSON object on the first line of `stdin`. The process should write the item back to `stdout` with any changes as a result of the action.
Item actions are performed by executing `action.<name>.exe` with `action.<name>.args` as arguments. The process will receive the item, serialized as JSON, on the first line of `stdin`. The process should write the item back to `stdout` as a single line of JSON with any updates from the action. An item must have a key under action` with that action's name to support executing that action for that item.
All encoding is done with UTF-8. If an item cannot be parsed or the exit code of the process is nonzero, intake will consider the action to be a failure. No items or other feed changes will happen as a result of a failed action, except for changes to `state` done by the action process.

View File

@ -58,7 +58,7 @@ def cmd_edit(cmd_args):
"exe": "", "exe": "",
"args": [], "args": [],
}, },
"actions": {}, "action": {},
"env": {}, "env": {},
}, },
f, f,

View File

@ -1,3 +1,4 @@
from datetime import timedelta
from pathlib import Path from pathlib import Path
from subprocess import Popen, PIPE, TimeoutExpired from subprocess import Popen, PIPE, TimeoutExpired
from threading import Thread from threading import Thread
@ -74,7 +75,7 @@ class LocalSource:
yield json.loads(filepath.read_text(encoding="utf8")) yield json.loads(filepath.read_text(encoding="utf8"))
def read_stdout(process: Popen, outs: list): def read_stdout(process: Popen, output: list):
""" """
Read the subprocess's stdout into memory. Read the subprocess's stdout into memory.
This prevents the process from blocking when the pipe fills up. This prevents the process from blocking when the pipe fills up.
@ -83,7 +84,7 @@ def read_stdout(process: Popen, outs: list):
data = process.stdout.readline() data = process.stdout.readline()
if data: if data:
print(f"[stdout] <{repr(data)}>") print(f"[stdout] <{repr(data)}>")
outs.append(data) output.append(data)
if process.poll() is not None: if process.poll() is not None:
break break
@ -101,62 +102,83 @@ def read_stderr(process: Popen):
break break
def fetch_items(source: LocalSource, update_timeout=60): def execute_source_action(
source: LocalSource, action: str, input: str, timeout: timedelta
):
""" """
Execute the feed source and return the current feed items. Execute the action from a given source. If stdin is specified, pass it
Returns a list of feed items on success. along to the process.
Throws SourceUpdateException if the feed source update failed.
""" """
# Load the source's config to get its update command # Gather the information necessary to launch the process
config = source.get_config() config = source.get_config()
action_cfg = config.get("action", {}).get(action)
if "fetch" not in config: if not action_cfg:
raise InvalidConfigException("Missing fetch") raise InvalidConfigException(f"No such action {action}")
if "exe" not in action_cfg:
raise InvalidConfigException(f"No exe for action {action}")
exe_name = config["fetch"]["exe"] command = [action_cfg["exe"], *action_cfg.get("args", [])]
exe_args = config["fetch"].get("args", []) env = {
# Overlay the current env with the config env and intake-provided values
exe_env = {
**os.environ.copy(), **os.environ.copy(),
**config.get("env", {}), **config.get("env", {}),
"STATE_PATH": str(source.get_state_path()), "STATE_PATH": str(source.get_state_path()),
} }
# Launch the update command # Launch the process
try: try:
process = Popen( process = Popen(
[exe_name, *exe_args], command,
stdin=PIPE,
stdout=PIPE, stdout=PIPE,
stderr=PIPE, stderr=PIPE,
cwd=source.source_path, cwd=source.source_path,
env=exe_env, env=env,
encoding="utf8", encoding="utf8",
) )
except PermissionError: except PermissionError:
raise SourceUpdateException("command not executable") raise SourceUpdateException(f"Command not executable: {''.join(command)}")
# While the update command is executing, watch its output # Kick off monitoring threads
t_stderr = Thread(target=read_stderr, args=(process,), daemon=True) output = []
t_stdout: Thread = Thread(target=read_stdout, args=(process, output), daemon=True)
t_stdout.start()
t_stderr: Thread = Thread(target=read_stderr, args=(process,), daemon=True)
t_stderr.start() t_stderr.start()
outs = [] # Send input to the process, if provided
t_stdout = Thread(target=read_stdout, args=(process, outs), daemon=True) if input:
t_stdout.start() process.stdin.write(input)
if not input.endswith("\n"):
process.stdin.write("\n")
process.stdin.flush()
# Time out the process if it takes too long
try: try:
process.wait(timeout=update_timeout) process.wait(timeout=timeout.total_seconds())
except TimeoutExpired: except TimeoutExpired:
process.kill() process.kill()
t_stdout.join(timeout=1) t_stdout.join(timeout=1)
t_stderr.join(timeout=1) t_stderr.join(timeout=1)
if process.poll(): if process.poll():
raise SourceUpdateException("return code") raise SourceUpdateException(
f"{source.source_name} {action} failed with code {process.returncode}"
)
return output
def fetch_items(source: LocalSource, timeout: int = 60):
"""
Execute the feed source and return the current feed items.
Returns a list of feed items on success.
Throws SourceUpdateException if the feed source update failed.
"""
items = [] items = []
for line in outs:
output = execute_source_action(source, "fetch", None, timedelta(timeout))
for line in output:
try: try:
item = json.loads(line) item = json.loads(line)
items.append(item) items.append(item)
@ -166,72 +188,18 @@ def fetch_items(source: LocalSource, update_timeout=60):
return items return items
def execute_action(source: LocalSource, item_id: str, action: str, action_timeout=60): def execute_action(source: LocalSource, item_id: str, action: str, timeout: int = 60):
""" """
Execute the action for a feed source. Execute the action for a feed source.
""" """
# Load the item
item = source.get_item(item_id) item = source.get_item(item_id)
# Load the source's config output = execute_source_action(source, action, json.dumps(item), timedelta(timeout))
config = source.get_config() if not output:
actions = config.get("actions", {})
if action not in actions:
raise InvalidConfigException(f"Missing action {action}")
exe_name = config["actions"][action]["exe"]
exe_args = config["actions"][action].get("args", [])
# Overlay the current env with the config env and intake-provided values
exe_env = {
**os.environ.copy(),
**config.get("env", {}),
"STATE_PATH": str(source.get_state_path()),
}
# Launch the action command
try:
process = Popen(
[exe_name, *exe_args],
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
cwd=source.source_path,
env=exe_env,
encoding="utf8",
)
except PermissionError:
raise SourceUpdateException("command not executable")
# While the update command is executing, watch its output
t_stderr = Thread(target=read_stderr, args=(process,), daemon=True)
t_stderr.start()
outs = []
t_stdout = Thread(target=read_stdout, args=(process, outs), daemon=True)
t_stdout.start()
# Send the item to the process
process.stdin.write(json.dumps(item))
process.stdin.write("\n")
process.stdin.flush()
# Time out the process if it takes too long
try:
process.wait(timeout=action_timeout)
except TimeoutExpired:
process.kill()
t_stdout.join(timeout=1)
t_stderr.join(timeout=1)
if process.poll():
raise SourceUpdateException("return code")
if not outs:
raise SourceUpdateException("no item") raise SourceUpdateException("no item")
try: try:
item = json.loads(outs[0]) item = json.loads(output[0])
source.save_item(item) source.save_item(item)
return item return item
except json.JSONDecodeError: except json.JSONDecodeError:

View File

@ -9,11 +9,16 @@ args = parser.parse_args()
print("args:", args, file=sys.stderr, flush=True) print("args:", args, file=sys.stderr, flush=True)
if args.action == "fetch": if args.action == "fetch":
print(json.dumps({"id": "caller", "action": {"value": 1}})) print(json.dumps({
"id": "updateme",
"action": {
"increment": 1
}
}))
if args.action == "increment": if args.action == "increment":
item = sys.stdin.readline() item = sys.stdin.readline()
item = json.loads(item) item = json.loads(item)
item["action"]["value"] += 1 item["action"]["increment"] += 1
print(json.dumps(item)) print(json.dumps(item))
pass pass

View File

@ -1,9 +1,9 @@
{ {
"action": {
"fetch": { "fetch": {
"exe": "./increment.py", "exe": "./increment.py",
"args": ["fetch"] "args": ["fetch"]
}, },
"actions": {
"increment": { "increment": {
"exe": "./increment.py", "exe": "./increment.py",
"args": ["increment"] "args": ["increment"]

View File

@ -1,7 +1,9 @@
{ {
"action": {
"fetch": { "fetch": {
"exe": "python3", "exe": "python3",
"args": ["update.py"] "args": ["update.py"]
}
}, },
"env": { "env": {
"HELLO": "WORLD" "HELLO": "WORLD"

View File

@ -1,4 +1,5 @@
{ {
"action": {
"fetch": { "fetch": {
"exe": "sh", "exe": "sh",
"args": [ "args": [
@ -6,4 +7,5 @@
"echo {\\\"id\\\": \\\"$(date +%Y-%m-%d-%H-%M)\\\"}" "echo {\\\"id\\\": \\\"$(date +%Y-%m-%d-%H-%M)\\\"}"
] ]
} }
}
} }