Add submodule and cli for executing source fetches

This commit is contained in:
Tim Van Baak 2023-05-29 14:58:22 -07:00
parent 8ed3b85cad
commit 7afd5e1484
7 changed files with 307 additions and 3 deletions

View File

@ -0,0 +1,55 @@
# intake
`intake` is an arbitrary feed aggregator.
## Feed Source Interface
The base `intake` directory is `$XDG_DATA_HOME/intake`. Each feed source's data is contained within a subdirectory of the base directory. The name of the feed source is the name of the subdirectory.
Feed source directories have the following structure:
```
intake
|- <source name>
| |- intake.json
| |- state
| |- <item id>.item
| |- <item id>.item
| |- ...
|- <source name>
| | ...
| ...
```
`intake.json` must be present; the other files are optional. Each `.item` file contains the data for one feed item. `state` provides a file for the feed source to write arbitrary data, e.g. JSON or binary data.
`intake.json` has the following structure:
```
{
"fetch": {
"exe": "<absolute path to program or name on intake's PATH>",
"args": ["list", "of", "program", "arguments"]
},
"action": {
"<action name>": {
"exe": "...",
"args": "..."
}
},
"env": { ... }
}
```
`fetch` is required. If `action` or `env` are absent, they will be treated as if they were empty.
When a feed source is updated, `fetch.exe` will be executed with `fetch.args` as arguments. The following environment variables will be set:
* `STATE_PATH` is set to the absolute path of `state`.
* Each key in `env` in `config.json` is passed with its value.
Each line written to the process's `stdout` will be parsed as a JSON object representing a feed item. Each line written to `stderr` will be logged by intake. `stdout` and `stderr` are decoded as UTF-8.
If invalid JSON is written, intake will consider the feed update to be a failure. If the exit code is nonzero, intake will consider the feed update to be a failure, even if valid JSON was received. No changes will happen to the feed state as a result of a failed update.
Item actions are performed by executing `action.<name>.exe` with `action.<name>.args` as arguments.

View File

@ -1 +1 @@
HELLO = "WORLD"
HELLO = "WORLD"

View File

@ -1,9 +1,112 @@
from pathlib import Path
import argparse
import os
import os.path
import sys
def main():
from .source import fetch_items
from .types import InvalidConfigException, SourceUpdateException
def intake_data_dir() -> Path:
home = Path(os.environ["HOME"])
data_home = Path(os.environ.get("XDG_DATA_HOME", home / ".local" / "share"))
intake_data = data_home / "intake"
return intake_data
def cmd_fetch(cmd_args):
"""Execute the fetch for a source."""
parser = argparse.ArgumentParser(
prog="intake fetch",
description=cmd_fetch.__doc__,
)
parser.add_argument(
"--base",
default=intake_data_dir(),
help="Path to the intake data directory containing source directories",
)
parser.add_argument(
"--source",
help="Source name to fetch",
)
args = parser.parse_args(cmd_args)
ret = 0
source_path = Path(args.base) / args.source
try:
print("Hello, world!")
items = fetch_items(source_path)
for item in items:
print("Item:", item)
except InvalidConfigException as ex:
print("Could not fetch", args.source)
print(ex)
ret = 1
except SourceUpdateException as ex:
print("Error updating source", args.source)
print(ex)
ret = 1
return ret
def cmd_help(_):
"""Print the help text."""
print_usage()
return 0
def execute_cli():
"""
Internal entry point for CLI execution.
"""
# Collect the commands in this module.
cli = sys.modules[__name__]
commands = {
name[4:]: func for name, func in vars(cli).items() if name.startswith("cmd_")
}
names_width = max(map(len, commands.keys()))
desc_fmt = f" {{0:<{names_width}}} {{1}}"
descriptions = "\n".join(
[desc_fmt.format(name, func.__doc__) for name, func in commands.items()]
)
# Set up the top-level parser
parser = argparse.ArgumentParser(
prog="intake",
description=f"Available commands:\n{descriptions}\n",
formatter_class=argparse.RawDescriptionHelpFormatter,
# add_help=False,
)
parser.add_argument(
"command",
nargs="?",
default="help",
help="The command to execute",
choices=commands,
metavar="command",
)
parser.add_argument(
"args", nargs=argparse.REMAINDER, help="Command arguments", metavar="args"
)
# Extract the usage print for command_help
global print_usage
print_usage = parser.print_help
args = parser.parse_args()
# Execute command
sys.exit(commands[args.command](args.args))
def main():
"""
Main entry point for CLI execution.
"""
try:
execute_cli()
except BrokenPipeError:
# See https://docs.python.org/3.10/library/signal.html#note-on-sigpipe
devnull = os.open(os.devnull, os.O_WRONLY)

102
intake/source.py Executable file
View File

@ -0,0 +1,102 @@
from pathlib import Path
from subprocess import Popen, PIPE, TimeoutExpired
from threading import Thread
import json
import os
import os.path
from .types import InvalidConfigException, SourceUpdateException
def read_stdout(process: Popen, outs: list):
"""
Read the subprocess's stdout into memory.
This prevents the process from blocking when the pipe fills up.
"""
while True:
data = process.stdout.readline()
if data:
print(f"[stdout] <{repr(data)}>")
outs.append(data)
if process.poll() is not None:
break
def read_stderr(process: Popen):
"""
Read the subprocess's stderr stream and pass it to logging.
This prevents the process from blocking when the pipe fills up.
"""
while True:
data = process.stderr.readline()
if data:
print(f"[stderr] <{repr(data)}>")
if process.poll() is not None:
break
def fetch_items(source_path: Path, update_timeout=60):
"""
Execute the feed source and return the current feed items.
Returns a list of feed items on success.
Throws SourceUpdateException if the feed source update failed.
"""
# Load the source's config to get its update command
config_path = source_path / "intake.json"
with open(config_path, "r", encoding="utf8") as config_file:
config = json.load(config_file)
if "fetch" not in config:
raise InvalidConfigException("Missing exe")
exe_name = config["fetch"]["exe"]
exe_args = config["fetch"].get("args", [])
# Overlay the current env with the config env and intake-provided values
exe_env = {
**os.environ.copy(),
**config.get("env", {}),
"STATE_PATH": str((source_path / "state").absolute()),
}
# Launch the update command
try:
process = Popen(
[exe_name, *exe_args],
stdout=PIPE,
stderr=PIPE,
cwd=source_path,
env=exe_env,
encoding="utf8",
)
except PermissionError:
raise SourceUpdateException("command not executable")
# While the update command is executing, watch its output
t_stderr = Thread(target=read_stderr, args=(process,), daemon=True)
t_stderr.start()
outs = []
t_stdout = Thread(target=read_stdout, args=(process, outs), daemon=True)
t_stdout.start()
# Time out the process if it takes too long
try:
process.wait(timeout=update_timeout)
except TimeoutExpired:
process.kill()
t_stdout.join(timeout=1)
t_stderr.join(timeout=1)
if process.poll():
raise SourceUpdateException("return code")
items = []
for line in outs:
try:
item = json.loads(line)
items.append(item)
except json.JSONDecodeError:
raise SourceUpdateException("invalid json")
return items

21
intake/types.py Normal file
View File

@ -0,0 +1,21 @@
"""
Common exception types.
"""
class IntakeException(Exception):
"""
Base class for intake application exceptions.
"""
class InvalidConfigException(IntakeException):
"""
Could not interact with a source because the source's config was not valid.
"""
class SourceUpdateException(Exception):
"""
The source update process did not return valid data and signal success.
"""

View File

@ -0,0 +1,9 @@
{
"fetch": {
"exe": "python3",
"args": ["update.py"]
},
"env": {
"HELLO": "WORLD"
}
}

14
tests/source01/update.py Executable file
View File

@ -0,0 +1,14 @@
#!/usr/bin/env python3
import json
import os
import sys
import time
for i in range(3):
sys.stderr.write(f"{i+1}...\n")
sys.stderr.flush()
time.sleep(1)
item = json.dumps({"id": "helloworld", "title": "Hello = " + os.environ.get("HELLO", "MISSING")})
print(item)