From 7afd5e14846417c4f6e80785e2541b043b84457a Mon Sep 17 00:00:00 2001 From: Tim Van Baak Date: Mon, 29 May 2023 14:58:22 -0700 Subject: [PATCH] Add submodule and cli for executing source fetches --- README.md | 55 +++++++++++++++++++ intake/__init__.py | 2 +- intake/cli.py | 107 ++++++++++++++++++++++++++++++++++++- intake/source.py | 102 +++++++++++++++++++++++++++++++++++ intake/types.py | 21 ++++++++ tests/source01/intake.json | 9 ++++ tests/source01/update.py | 14 +++++ 7 files changed, 307 insertions(+), 3 deletions(-) create mode 100755 intake/source.py create mode 100644 intake/types.py create mode 100644 tests/source01/intake.json create mode 100755 tests/source01/update.py diff --git a/README.md b/README.md index e69de29..5f966a9 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,55 @@ +# intake + +`intake` is an arbitrary feed aggregator. + +## Feed Source Interface + +The base `intake` directory is `$XDG_DATA_HOME/intake`. Each feed source's data is contained within a subdirectory of the base directory. The name of the feed source is the name of the subdirectory. + +Feed source directories have the following structure: + +``` +intake + |- + | |- intake.json + | |- state + | |- .item + | |- .item + | |- ... + |- + | | ... + | ... +``` + +`intake.json` must be present; the other files are optional. Each `.item` file contains the data for one feed item. `state` provides a file for the feed source to write arbitrary data, e.g. JSON or binary data. + +`intake.json` has the following structure: + +``` +{ + "fetch": { + "exe": "", + "args": ["list", "of", "program", "arguments"] + }, + "action": { + "": { + "exe": "...", + "args": "..." + } + }, + "env": { ... } +} +``` + +`fetch` is required. If `action` or `env` are absent, they will be treated as if they were empty. + +When a feed source is updated, `fetch.exe` will be executed with `fetch.args` as arguments. The following environment variables will be set: + +* `STATE_PATH` is set to the absolute path of `state`. +* Each key in `env` in `config.json` is passed with its value. + +Each line written to the process's `stdout` will be parsed as a JSON object representing a feed item. Each line written to `stderr` will be logged by intake. `stdout` and `stderr` are decoded as UTF-8. + +If invalid JSON is written, intake will consider the feed update to be a failure. If the exit code is nonzero, intake will consider the feed update to be a failure, even if valid JSON was received. No changes will happen to the feed state as a result of a failed update. + +Item actions are performed by executing `action..exe` with `action..args` as arguments. \ No newline at end of file diff --git a/intake/__init__.py b/intake/__init__.py index c79c806..6b75871 100644 --- a/intake/__init__.py +++ b/intake/__init__.py @@ -1 +1 @@ -HELLO = "WORLD" \ No newline at end of file +HELLO = "WORLD" diff --git a/intake/cli.py b/intake/cli.py index bf513cd..d25da33 100644 --- a/intake/cli.py +++ b/intake/cli.py @@ -1,9 +1,112 @@ +from pathlib import Path +import argparse import os +import os.path import sys -def main(): +from .source import fetch_items +from .types import InvalidConfigException, SourceUpdateException + + +def intake_data_dir() -> Path: + home = Path(os.environ["HOME"]) + data_home = Path(os.environ.get("XDG_DATA_HOME", home / ".local" / "share")) + intake_data = data_home / "intake" + return intake_data + + +def cmd_fetch(cmd_args): + """Execute the fetch for a source.""" + parser = argparse.ArgumentParser( + prog="intake fetch", + description=cmd_fetch.__doc__, + ) + parser.add_argument( + "--base", + default=intake_data_dir(), + help="Path to the intake data directory containing source directories", + ) + parser.add_argument( + "--source", + help="Source name to fetch", + ) + args = parser.parse_args(cmd_args) + ret = 0 + + source_path = Path(args.base) / args.source try: - print("Hello, world!") + items = fetch_items(source_path) + for item in items: + print("Item:", item) + except InvalidConfigException as ex: + print("Could not fetch", args.source) + print(ex) + ret = 1 + except SourceUpdateException as ex: + print("Error updating source", args.source) + print(ex) + ret = 1 + + return ret + + +def cmd_help(_): + """Print the help text.""" + print_usage() + return 0 + + +def execute_cli(): + """ + Internal entry point for CLI execution. + """ + + # Collect the commands in this module. + cli = sys.modules[__name__] + commands = { + name[4:]: func for name, func in vars(cli).items() if name.startswith("cmd_") + } + names_width = max(map(len, commands.keys())) + desc_fmt = f" {{0:<{names_width}}} {{1}}" + descriptions = "\n".join( + [desc_fmt.format(name, func.__doc__) for name, func in commands.items()] + ) + + # Set up the top-level parser + parser = argparse.ArgumentParser( + prog="intake", + description=f"Available commands:\n{descriptions}\n", + formatter_class=argparse.RawDescriptionHelpFormatter, + # add_help=False, + ) + parser.add_argument( + "command", + nargs="?", + default="help", + help="The command to execute", + choices=commands, + metavar="command", + ) + parser.add_argument( + "args", nargs=argparse.REMAINDER, help="Command arguments", metavar="args" + ) + + # Extract the usage print for command_help + global print_usage + print_usage = parser.print_help + + args = parser.parse_args() + + # Execute command + sys.exit(commands[args.command](args.args)) + + +def main(): + """ + Main entry point for CLI execution. + """ + try: + execute_cli() except BrokenPipeError: # See https://docs.python.org/3.10/library/signal.html#note-on-sigpipe devnull = os.open(os.devnull, os.O_WRONLY) diff --git a/intake/source.py b/intake/source.py new file mode 100755 index 0000000..6a425bc --- /dev/null +++ b/intake/source.py @@ -0,0 +1,102 @@ +from pathlib import Path +from subprocess import Popen, PIPE, TimeoutExpired +from threading import Thread +import json +import os +import os.path + +from .types import InvalidConfigException, SourceUpdateException + + +def read_stdout(process: Popen, outs: list): + """ + Read the subprocess's stdout into memory. + This prevents the process from blocking when the pipe fills up. + """ + while True: + data = process.stdout.readline() + if data: + print(f"[stdout] <{repr(data)}>") + outs.append(data) + if process.poll() is not None: + break + + +def read_stderr(process: Popen): + """ + Read the subprocess's stderr stream and pass it to logging. + This prevents the process from blocking when the pipe fills up. + """ + while True: + data = process.stderr.readline() + if data: + print(f"[stderr] <{repr(data)}>") + if process.poll() is not None: + break + + +def fetch_items(source_path: Path, update_timeout=60): + """ + Execute the feed source and return the current feed items. + Returns a list of feed items on success. + Throws SourceUpdateException if the feed source update failed. + """ + # Load the source's config to get its update command + config_path = source_path / "intake.json" + with open(config_path, "r", encoding="utf8") as config_file: + config = json.load(config_file) + + if "fetch" not in config: + raise InvalidConfigException("Missing exe") + + exe_name = config["fetch"]["exe"] + exe_args = config["fetch"].get("args", []) + + # Overlay the current env with the config env and intake-provided values + exe_env = { + **os.environ.copy(), + **config.get("env", {}), + "STATE_PATH": str((source_path / "state").absolute()), + } + + # Launch the update command + try: + process = Popen( + [exe_name, *exe_args], + stdout=PIPE, + stderr=PIPE, + cwd=source_path, + env=exe_env, + encoding="utf8", + ) + except PermissionError: + raise SourceUpdateException("command not executable") + + # While the update command is executing, watch its output + t_stderr = Thread(target=read_stderr, args=(process,), daemon=True) + t_stderr.start() + + outs = [] + t_stdout = Thread(target=read_stdout, args=(process, outs), daemon=True) + t_stdout.start() + + # Time out the process if it takes too long + try: + process.wait(timeout=update_timeout) + except TimeoutExpired: + process.kill() + t_stdout.join(timeout=1) + t_stderr.join(timeout=1) + + if process.poll(): + raise SourceUpdateException("return code") + + items = [] + for line in outs: + try: + item = json.loads(line) + items.append(item) + except json.JSONDecodeError: + raise SourceUpdateException("invalid json") + + return items diff --git a/intake/types.py b/intake/types.py new file mode 100644 index 0000000..d2cea93 --- /dev/null +++ b/intake/types.py @@ -0,0 +1,21 @@ +""" +Common exception types. +""" + + +class IntakeException(Exception): + """ + Base class for intake application exceptions. + """ + + +class InvalidConfigException(IntakeException): + """ + Could not interact with a source because the source's config was not valid. + """ + + +class SourceUpdateException(Exception): + """ + The source update process did not return valid data and signal success. + """ diff --git a/tests/source01/intake.json b/tests/source01/intake.json new file mode 100644 index 0000000..1a9f6be --- /dev/null +++ b/tests/source01/intake.json @@ -0,0 +1,9 @@ +{ + "fetch": { + "exe": "python3", + "args": ["update.py"] + }, + "env": { + "HELLO": "WORLD" + } +} \ No newline at end of file diff --git a/tests/source01/update.py b/tests/source01/update.py new file mode 100755 index 0000000..28c61a0 --- /dev/null +++ b/tests/source01/update.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python3 + +import json +import os +import sys +import time + +for i in range(3): + sys.stderr.write(f"{i+1}...\n") + sys.stderr.flush() + time.sleep(1) + +item = json.dumps({"id": "helloworld", "title": "Hello = " + os.environ.get("HELLO", "MISSING")}) +print(item)