Compare commits
22 Commits
Author | SHA1 | Date | |
---|---|---|---|
bbe3f53cd4 | |||
718220f889 | |||
8337b74a80 | |||
527e37003c | |||
ec33495c56 | |||
d0780a9fd1 | |||
c903947f00 | |||
e72f9aa349 | |||
381de535f7 | |||
9bb331941f | |||
256a52e902 | |||
5cf7e8ef48 | |||
008c0c8230 | |||
0c4177b783 | |||
c21f55da8d | |||
4abb281715 | |||
92a10c5ca8 | |||
4947ce56c6 | |||
40ad075e96 | |||
887a75bb29 | |||
a5754e7023 | |||
75c13a21dc |
4
.gitignore
vendored
4
.gitignore
vendored
@ -159,6 +159,8 @@ cython_debug/
|
||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||
#.idea/
|
||||
|
||||
venv
|
||||
|
||||
# nixos-shell
|
||||
nixos.qcow2
|
||||
|
||||
@ -166,4 +168,4 @@ nixos.qcow2
|
||||
result
|
||||
|
||||
# test sources
|
||||
tests/**/*.item
|
||||
tests/**/*.item
|
||||
|
18
Makefile
Normal file
18
Makefile
Normal file
@ -0,0 +1,18 @@
|
||||
.PHONY: *
|
||||
|
||||
help: ## display this help
|
||||
@awk 'BEGIN{FS = ":.*##"; printf "\033[1m\nUsage\n \033[1;92m make\033[0;36m <target>\033[0m\n"} /^[a-zA-Z0-9_-]+:.*?##/ { printf " \033[36m%-15s\033[0m %s\n", $$1, $$2 } ' $(MAKEFILE_LIST)
|
||||
|
||||
venv: ## link the python dev environment to ./venv
|
||||
nix build .#pythonEnv -o venv
|
||||
|
||||
lint: ## run linters
|
||||
nix fmt
|
||||
black **/*.py
|
||||
|
||||
test: ## run pytest
|
||||
python -m pytest
|
||||
|
||||
reset: ## delete all items in tests/ and update each source once
|
||||
rm -v tests/*/*.item
|
||||
find tests -name intake.json -printf "%h\n" | cut -c7- | xargs -n 1 python -m intake update -d tests -s
|
17
README.md
17
README.md
@ -31,32 +31,35 @@ intake
|
||||
{
|
||||
"action": {
|
||||
"fetch": {
|
||||
"exe": "<absolute path to program or name on intake's PATH>",
|
||||
"args": ["list", "of", "program", "arguments"]
|
||||
"args": ["program name", "and", "list", "of", "arguments"]
|
||||
},
|
||||
"<action name>": {
|
||||
"exe": "...",
|
||||
"args": "..."
|
||||
}
|
||||
},
|
||||
"env": {
|
||||
"...": "..."
|
||||
},
|
||||
"cron": "* * * * *"
|
||||
"cron": "* * * * *",
|
||||
"batch": "<number>"
|
||||
}
|
||||
```
|
||||
|
||||
Each key under `action` defines an action that can be taken for the source. An action must contain `exe` and may contain `args`. A source must have a `fetch` action.
|
||||
Each key under `action` defines an action that can be taken for the source. A source must at least have a `fetch` action. If an action named `on_create` is defined for the source, it is executed once for an item when that item is created, that is, the first time the item is returned from the source.
|
||||
|
||||
Each key under `env` defines an environment variable that will be set when actions are executed.
|
||||
Each key under `env` defines an environment variable that will be set when `fetch` or other actions are executed.
|
||||
|
||||
If `cron` is present, it must define a crontab schedule. Intake will automatically create crontab entries to update each source according to its cron schedule.
|
||||
|
||||
`batch` may be a number or string of a number. If it is present, items created by the source will be batched via `tts` so that all items created in a single 24-hour window become visible at the same time. Items created with a longer `tts` will keep their `tts`.
|
||||
|
||||
The batch window is computed from midnight to midnight UTC, offset by the value of `batch` (in seconds).
|
||||
|
||||
## Interface for source programs
|
||||
|
||||
Intake interacts with sources by executing the actions defined in the source's `intake.json`. The `fetch` action is required and used to check for new feed items when `intake update` is executed.
|
||||
|
||||
To execute an action, intake executes the `exe` program for the action with the corresponding `args` (if present) as arguments. The process's working directory is set to the source's folder, i.e. the folder containing `intake.json`. The process's environment is as follows:
|
||||
To execute an action, intake executes the command given by `args`. The process's working directory is set to the source's folder, i.e. the folder containing `intake.json`. The process's environment is as follows:
|
||||
|
||||
* intake's environment is inherited.
|
||||
* `STATE_PATH` is set to the absolute path of `state`.
|
||||
|
19
default.nix
19
default.nix
@ -1,10 +1,9 @@
|
||||
(import
|
||||
(
|
||||
let lock = builtins.fromJSON (builtins.readFile ./flake.lock); in
|
||||
fetchTarball {
|
||||
url = "https://github.com/edolstra/flake-compat/archive/${lock.nodes.flake-compat.locked.rev}.tar.gz";
|
||||
sha256 = lock.nodes.flake-compat.locked.narHash;
|
||||
}
|
||||
)
|
||||
{ src = ./.; }
|
||||
).defaultNix
|
||||
(import (
|
||||
let
|
||||
lock = builtins.fromJSON (builtins.readFile ./flake.lock);
|
||||
in
|
||||
fetchTarball {
|
||||
url = "https://github.com/edolstra/flake-compat/archive/${lock.nodes.flake-compat.locked.rev}.tar.gz";
|
||||
sha256 = lock.nodes.flake-compat.locked.narHash;
|
||||
}
|
||||
) { src = ./.; }).defaultNix
|
||||
|
@ -14,7 +14,7 @@
|
||||
users.users.bob = {
|
||||
isNormalUser = true;
|
||||
password = "beta";
|
||||
uid = 1001;
|
||||
uid = 1001;
|
||||
packages = [ pkgs.intake ];
|
||||
};
|
||||
|
||||
@ -26,11 +26,13 @@
|
||||
};
|
||||
|
||||
# Expose the vm's intake revproxy at host port 5234
|
||||
virtualisation.forwardPorts = [{
|
||||
from = "host";
|
||||
host.port = 5234;
|
||||
guest.port = 8080;
|
||||
}];
|
||||
virtualisation.forwardPorts = [
|
||||
{
|
||||
from = "host";
|
||||
host.port = 5234;
|
||||
guest.port = 8080;
|
||||
}
|
||||
];
|
||||
|
||||
# Mount the demo content for both users
|
||||
nixos-shell.mounts = {
|
||||
@ -47,20 +49,20 @@
|
||||
|
||||
# Create an activation script that copies and chowns the demo content
|
||||
# chmod 777 because the users may not exist when the activation script runs
|
||||
system.activationScripts =
|
||||
let
|
||||
userSetup = name: uid: ''
|
||||
${pkgs.coreutils}/bin/mkdir -p /home/${name}/.local/share/intake
|
||||
${pkgs.coreutils}/bin/cp -r /mnt/${name}/* /home/${name}/.local/share/intake/
|
||||
${pkgs.coreutils}/bin/chown -R ${uid} /home/${name}
|
||||
${pkgs.findutils}/bin/find /home/${name} -type d -exec ${pkgs.coreutils}/bin/chmod 755 {} \;
|
||||
${pkgs.findutils}/bin/find /home/${name} -type f -exec ${pkgs.coreutils}/bin/chmod 644 {} \;
|
||||
'';
|
||||
in
|
||||
{
|
||||
aliceSetup = userSetup "alice" "1000";
|
||||
bobSetup = userSetup "bob" "1001";
|
||||
};
|
||||
system.activationScripts =
|
||||
let
|
||||
userSetup = name: uid: ''
|
||||
${pkgs.coreutils}/bin/mkdir -p /home/${name}/.local/share/intake
|
||||
${pkgs.coreutils}/bin/cp -r /mnt/${name}/* /home/${name}/.local/share/intake/
|
||||
${pkgs.coreutils}/bin/chown -R ${uid} /home/${name}
|
||||
${pkgs.findutils}/bin/find /home/${name} -type d -exec ${pkgs.coreutils}/bin/chmod 755 {} \;
|
||||
${pkgs.findutils}/bin/find /home/${name} -type f -exec ${pkgs.coreutils}/bin/chmod 644 {} \;
|
||||
'';
|
||||
in
|
||||
{
|
||||
aliceSetup = userSetup "alice" "1000";
|
||||
bobSetup = userSetup "bob" "1001";
|
||||
};
|
||||
|
||||
# Put the demo sources on the global PATH
|
||||
environment.variables.PATH = "/mnt/sources";
|
||||
|
135
flake.nix
135
flake.nix
@ -13,61 +13,90 @@
|
||||
nixos-shell.inputs.nixpkgs.follows = "nixpkgs";
|
||||
};
|
||||
|
||||
outputs = { self, nixpkgs, flake-compat, nixos-shell }:
|
||||
let
|
||||
inherit (nixpkgs.lib) makeOverridable nixosSystem;
|
||||
system = "x86_64-linux";
|
||||
in {
|
||||
packages.${system} = let
|
||||
pkgs = (import nixpkgs {
|
||||
outputs =
|
||||
{
|
||||
self,
|
||||
nixpkgs,
|
||||
flake-compat,
|
||||
nixos-shell,
|
||||
}:
|
||||
let
|
||||
inherit (nixpkgs.lib) makeOverridable nixosSystem;
|
||||
system = "x86_64-linux";
|
||||
pkgs = nixpkgs.legacyPackages.${system};
|
||||
pythonEnv = pkgs.python3.withPackages (
|
||||
pypkgs: with pypkgs; [
|
||||
flask
|
||||
black
|
||||
pytest
|
||||
]
|
||||
);
|
||||
in
|
||||
{
|
||||
formatter.${system} = nixpkgs.legacyPackages.${system}.nixfmt-rfc-style;
|
||||
|
||||
packages.${system} =
|
||||
let
|
||||
pkgs = (
|
||||
import nixpkgs {
|
||||
inherit system;
|
||||
overlays = [ self.overlays.default ];
|
||||
}
|
||||
);
|
||||
in
|
||||
{
|
||||
default = self.packages.${system}.intake;
|
||||
inherit (pkgs) intake;
|
||||
inherit pythonEnv;
|
||||
};
|
||||
|
||||
devShells.${system} = {
|
||||
default = pkgs.mkShell {
|
||||
packages = [
|
||||
pythonEnv
|
||||
pkgs.gnumake
|
||||
pkgs.nixos-shell
|
||||
# We only take this dependency for htpasswd, which is a little unfortunate
|
||||
pkgs.apacheHttpd
|
||||
];
|
||||
shellHook = ''
|
||||
PS1="(develop) $PS1"
|
||||
'';
|
||||
};
|
||||
};
|
||||
|
||||
overlays.default = final: prev: {
|
||||
intake = final.python3Packages.buildPythonPackage {
|
||||
name = "intake";
|
||||
src = builtins.path {
|
||||
path = ./.;
|
||||
name = "intake";
|
||||
};
|
||||
format = "pyproject";
|
||||
propagatedBuildInputs = with final.python3Packages; [
|
||||
flask
|
||||
setuptools
|
||||
];
|
||||
};
|
||||
};
|
||||
|
||||
templates.source = {
|
||||
path = builtins.path {
|
||||
path = ./template;
|
||||
name = "source";
|
||||
};
|
||||
description = "A basic intake source config";
|
||||
};
|
||||
|
||||
nixosModules.default = import ./module.nix self;
|
||||
|
||||
nixosConfigurations."demo" = makeOverridable nixosSystem {
|
||||
inherit system;
|
||||
overlays = [ self.overlays.default ];
|
||||
});
|
||||
in {
|
||||
default = self.packages.${system}.intake;
|
||||
inherit (pkgs) intake;
|
||||
};
|
||||
|
||||
devShells.${system} = {
|
||||
default = let
|
||||
pkgs = nixpkgs.legacyPackages.${system};
|
||||
pythonEnv = pkgs.python3.withPackages (pypkgs: with pypkgs; [ flask black pytest ]);
|
||||
in pkgs.mkShell {
|
||||
packages = [
|
||||
pythonEnv
|
||||
pkgs.nixos-shell
|
||||
# We only take this dependency for htpasswd, which is a little unfortunate
|
||||
pkgs.apacheHttpd
|
||||
modules = [
|
||||
nixos-shell.nixosModules.nixos-shell
|
||||
self.nixosModules.default
|
||||
./demo
|
||||
];
|
||||
shellHook = ''
|
||||
PS1="(develop) $PS1"
|
||||
'';
|
||||
};
|
||||
};
|
||||
|
||||
overlays.default = final: prev: {
|
||||
intake = final.python3Packages.buildPythonPackage {
|
||||
name = "intake";
|
||||
src = builtins.path { path = ./.; name = "intake"; };
|
||||
format = "pyproject";
|
||||
propagatedBuildInputs = with final.python3Packages; [ flask setuptools ];
|
||||
};
|
||||
};
|
||||
|
||||
templates.source = {
|
||||
path = builtins.path { path = ./template; name = "source"; };
|
||||
description = "A basic intake source config";
|
||||
};
|
||||
|
||||
nixosModules.default = import ./module.nix self;
|
||||
|
||||
nixosConfigurations."demo" = makeOverridable nixosSystem {
|
||||
inherit system;
|
||||
modules = [
|
||||
nixos-shell.nixosModules.nixos-shell
|
||||
self.nixosModules.default
|
||||
./demo
|
||||
];
|
||||
};
|
||||
};
|
||||
}
|
||||
|
@ -18,9 +18,16 @@ from flask import (
|
||||
current_app,
|
||||
)
|
||||
|
||||
from intake.core import intake_data_dir
|
||||
from intake.crontab import update_crontab_entries
|
||||
from intake.source import LocalSource, execute_action, Item
|
||||
from intake.source import (
|
||||
LocalSource,
|
||||
execute_action,
|
||||
Item,
|
||||
fetch_items,
|
||||
update_items,
|
||||
intake_data_dir,
|
||||
)
|
||||
from intake.types import InvalidConfigException, SourceUpdateException
|
||||
|
||||
# Globals
|
||||
app = Flask(__name__)
|
||||
@ -115,10 +122,23 @@ def root():
|
||||
if channels_config_path.exists():
|
||||
channels = json.loads(channels_config_path.read_text(encoding="utf8"))
|
||||
|
||||
channel_counts = {
|
||||
channel: len(
|
||||
[
|
||||
item
|
||||
for source in sources
|
||||
for item in LocalSource(data_path, source).get_all_items()
|
||||
if not item.is_hidden
|
||||
]
|
||||
)
|
||||
for channel, sources in channels.items()
|
||||
}
|
||||
|
||||
return render_template(
|
||||
"home.jinja2",
|
||||
sources=sources,
|
||||
channels=channels,
|
||||
channel_counts=channel_counts,
|
||||
)
|
||||
|
||||
|
||||
@ -306,13 +326,15 @@ def _parse_source_config(config_str: str):
|
||||
if "fetch" not in action:
|
||||
return ("No fetch action defined", {})
|
||||
fetch = action["fetch"]
|
||||
if "exe" not in fetch:
|
||||
if "exe" not in fetch and not fetch.get("args"):
|
||||
return ("No fetch exe", {})
|
||||
config = {"action": parsed["action"]}
|
||||
if "env" in parsed:
|
||||
config["env"] = parsed["env"]
|
||||
if "cron" in parsed:
|
||||
config["cron"] = parsed["cron"]
|
||||
if "batch" in parsed:
|
||||
config["batch"] = parsed["batch"]
|
||||
return (None, config)
|
||||
|
||||
|
||||
@ -368,6 +390,24 @@ def _parse_channels_config(config_str: str):
|
||||
return (None, parsed)
|
||||
|
||||
|
||||
@app.post("/fetch/<string:source_name>")
|
||||
@auth_check
|
||||
def fetch(source_name: str):
|
||||
data_path: Path = current_app.config["INTAKE_DATA"]
|
||||
source = LocalSource(data_path, source_name)
|
||||
|
||||
try:
|
||||
items = fetch_items(source)
|
||||
update_items(source, items)
|
||||
titles = "\n".join(f"<li>{item.display_title}</li>" for item in items)
|
||||
source_url = url_for("source_feed", name=source_name)
|
||||
return f'Update returned {len(items)} items:<ul>{titles}</ul><p><a href="{source_url}">{source_name}</a></p>'
|
||||
except InvalidConfigException as ex:
|
||||
abort(500, f"Could not fetch {source_name}:\n{ex}")
|
||||
except SourceUpdateException as ex:
|
||||
abort(500, f"Error updating source {source_name}:\n{ex}")
|
||||
|
||||
|
||||
@app.post("/add")
|
||||
@auth_check
|
||||
def add_item():
|
||||
@ -379,7 +419,7 @@ def add_item():
|
||||
config_path = source_path / "intake.json"
|
||||
if not config_path.exists():
|
||||
config_path.write_text(
|
||||
json.dumps({"action": {"fetch": {"exe": "true"}}}, indent=2)
|
||||
json.dumps({"action": {"fetch": {"args": ["true"]}}}, indent=2)
|
||||
)
|
||||
source = LocalSource(source_path.parent, source_path.name)
|
||||
|
||||
|
@ -10,9 +10,14 @@ import pwd
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
from intake.core import intake_data_dir
|
||||
from intake.crontab import update_crontab_entries
|
||||
from intake.source import fetch_items, LocalSource, update_items, execute_action
|
||||
from intake.source import (
|
||||
fetch_items,
|
||||
LocalSource,
|
||||
update_items,
|
||||
execute_action,
|
||||
intake_data_dir,
|
||||
)
|
||||
from intake.types import InvalidConfigException, SourceUpdateException
|
||||
|
||||
|
||||
@ -52,7 +57,6 @@ def cmd_edit(cmd_args):
|
||||
{
|
||||
"action": {
|
||||
"fetch": {
|
||||
"exe": "",
|
||||
"args": [],
|
||||
},
|
||||
},
|
||||
|
@ -1,12 +0,0 @@
|
||||
from pathlib import Path
|
||||
import os
|
||||
|
||||
|
||||
def intake_data_dir() -> Path:
|
||||
if intake_data := os.environ.get("INTAKE_DATA"):
|
||||
return Path(intake_data)
|
||||
if xdg_data_home := os.environ.get("XDG_DATA_HOME"):
|
||||
return Path(xdg_data_home) / "intake"
|
||||
if home := os.environ.get("HOME"):
|
||||
return Path(home) / ".local" / "share" / "intake"
|
||||
raise Exception("No intake data directory defined")
|
@ -1,5 +1,6 @@
|
||||
from pathlib import Path
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
@ -28,15 +29,13 @@ def update_crontab_entries(data_path: Path):
|
||||
Update the intake-managed section of the user's crontab.
|
||||
"""
|
||||
# If there is no crontab command available, quit early.
|
||||
cmd = ("command", "-v", "crontab")
|
||||
print("Executing", *cmd, file=sys.stderr)
|
||||
crontab_exists = subprocess.run(cmd, shell=True)
|
||||
if crontab_exists.returncode:
|
||||
print("Could not update crontab", file=sys.stderr)
|
||||
crontab = shutil.which("crontab")
|
||||
if not crontab:
|
||||
print("No crontab found, skipping", file=sys.stderr)
|
||||
return
|
||||
|
||||
# Get the current crontab
|
||||
cmd = ["crontab", "-e"]
|
||||
cmd = [crontab, "-e"]
|
||||
print("Executing", *cmd, file=sys.stderr)
|
||||
get_crontab = subprocess.run(
|
||||
cmd,
|
||||
@ -77,7 +76,7 @@ def update_crontab_entries(data_path: Path):
|
||||
print("Updating", len(new_crontab_lines) - 2, "crontab entries", file=sys.stderr)
|
||||
|
||||
# Save the updated crontab
|
||||
cmd = ["crontab", "-"]
|
||||
cmd = [crontab, "-"]
|
||||
print("Executing", *cmd, file=sys.stderr)
|
||||
new_crontab: bytes = "\n".join(new_crontab_lines).encode("utf8")
|
||||
save_crontab = subprocess.Popen(
|
||||
|
@ -3,7 +3,7 @@ from pathlib import Path
|
||||
from subprocess import Popen, PIPE, TimeoutExpired
|
||||
from threading import Thread
|
||||
from time import time as current_time
|
||||
from typing import List
|
||||
from typing import Generator
|
||||
import json
|
||||
import os
|
||||
import os.path
|
||||
@ -12,6 +12,16 @@ import sys
|
||||
from intake.types import InvalidConfigException, SourceUpdateException
|
||||
|
||||
|
||||
def intake_data_dir() -> Path:
|
||||
if intake_data := os.environ.get("INTAKE_DATA"):
|
||||
return Path(intake_data)
|
||||
if xdg_data_home := os.environ.get("XDG_DATA_HOME"):
|
||||
return Path(xdg_data_home) / "intake"
|
||||
if home := os.environ.get("HOME"):
|
||||
return Path(home) / ".local" / "share" / "intake"
|
||||
raise Exception("No intake data directory defined")
|
||||
|
||||
|
||||
class Item:
|
||||
"""
|
||||
A wrapper for an item object.
|
||||
@ -71,26 +81,31 @@ class Item:
|
||||
# The time-to-live fields protects an item from removal until expiry.
|
||||
# This is mainly used to avoid old items resurfacing when their source
|
||||
# cannot guarantee monotonocity.
|
||||
if "ttl" in self._item:
|
||||
ttl_date = self._item["created"] + self._item["ttl"]
|
||||
if ttl_date > current_time():
|
||||
return False
|
||||
if "ttl" in self._item and self.ttl_at > current_time():
|
||||
return False
|
||||
|
||||
# The time-to-die field puts a maximum lifespan on an item, removing it
|
||||
# even if it is active.
|
||||
if "ttd" in self._item:
|
||||
ttd_date = self._item["created"] + self._item["ttd"]
|
||||
if ttd_date < current_time():
|
||||
return True
|
||||
if "ttd" in self._item and self.ttd_at < current_time():
|
||||
return True
|
||||
|
||||
return not self._item["active"]
|
||||
|
||||
@property
|
||||
def tts_at(self):
|
||||
return self._item["created"] + self._item.get("tts", 0)
|
||||
|
||||
@property
|
||||
def ttl_at(self):
|
||||
return self._item["created"] + self._item.get("ttl", 0)
|
||||
|
||||
@property
|
||||
def ttd_at(self):
|
||||
return self._item["created"] + self._item.get("ttd", 0)
|
||||
|
||||
@property
|
||||
def before_tts(self):
|
||||
return (
|
||||
"tts" in self._item
|
||||
and current_time() < self._item["created"] + self._item["tts"]
|
||||
)
|
||||
return "tts" in self._item and current_time() < self.tts_at
|
||||
|
||||
@property
|
||||
def is_hidden(self):
|
||||
@ -158,7 +173,7 @@ class LocalSource:
|
||||
def get_item_path(self, item_id: dict) -> Path:
|
||||
return self.source_path / f"{item_id}.item"
|
||||
|
||||
def get_item_ids(self) -> List[str]:
|
||||
def get_item_ids(self) -> list[str]:
|
||||
return [
|
||||
filepath.name[:-5]
|
||||
for filepath in self.source_path.iterdir()
|
||||
@ -183,7 +198,7 @@ class LocalSource:
|
||||
def delete_item(self, item_id) -> None:
|
||||
os.remove(self.get_item_path(item_id))
|
||||
|
||||
def get_all_items(self) -> List[Item]:
|
||||
def get_all_items(self) -> Generator[Item, None, None]:
|
||||
for filepath in self.source_path.iterdir():
|
||||
if filepath.name.endswith(".item"):
|
||||
yield Item(self, json.loads(filepath.read_text(encoding="utf8")))
|
||||
@ -218,7 +233,7 @@ def _read_stderr(process: Popen) -> None:
|
||||
|
||||
def _execute_source_action(
|
||||
source: LocalSource, action: str, input: str, timeout: timedelta
|
||||
) -> List[str]:
|
||||
) -> list[str]:
|
||||
"""
|
||||
Execute the action from a given source. If stdin is specified, pass it
|
||||
along to the process. Returns lines from stdout.
|
||||
@ -229,10 +244,13 @@ def _execute_source_action(
|
||||
|
||||
if not action_cfg:
|
||||
raise InvalidConfigException(f"No such action {action}")
|
||||
if "exe" not in action_cfg:
|
||||
|
||||
exe = [action_cfg["exe"]] if "exe" in action_cfg else []
|
||||
command = exe + action_cfg.get("args", [])
|
||||
|
||||
if not command:
|
||||
raise InvalidConfigException(f"No exe for action {action}")
|
||||
|
||||
command = [action_cfg["exe"], *action_cfg.get("args", [])]
|
||||
config_env = {key: str(value) for key, value in config.get("env", {}).items()}
|
||||
env = {
|
||||
**os.environ.copy(),
|
||||
@ -283,13 +301,13 @@ def _execute_source_action(
|
||||
return output
|
||||
|
||||
|
||||
def fetch_items(source: LocalSource, timeout: int = 60) -> List[Item]:
|
||||
def fetch_items(source: LocalSource, timeout: int = 60) -> list[Item]:
|
||||
"""
|
||||
Execute the feed source and return the current feed items.
|
||||
Returns a list of feed items on success.
|
||||
Throws SourceUpdateException if the feed source update failed.
|
||||
"""
|
||||
items: List[Item] = []
|
||||
items: list[Item] = []
|
||||
|
||||
output = _execute_source_action(source, "fetch", None, timedelta(timeout))
|
||||
|
||||
@ -325,29 +343,54 @@ def execute_action(
|
||||
raise SourceUpdateException("invalid json")
|
||||
|
||||
|
||||
def update_items(source: LocalSource, fetched_items: List[Item]):
|
||||
def update_items(source: LocalSource, fetched_items: list[Item]):
|
||||
"""
|
||||
Update the source with a batch of new items, doing creations, updates, and
|
||||
deletions as necessary.
|
||||
"""
|
||||
config = source.get_config()
|
||||
|
||||
# Get a list of item ids that already existed for this source.
|
||||
prior_ids = source.get_item_ids()
|
||||
print(f"Found {len(prior_ids)} prior items", file=sys.stderr)
|
||||
|
||||
# Determine which items are new and which are updates.
|
||||
new_items: List[Item] = []
|
||||
upd_items: List[Item] = []
|
||||
new_items: list[Item] = []
|
||||
upd_items: list[Item] = []
|
||||
for item in fetched_items:
|
||||
if source.item_exists(item["id"]):
|
||||
upd_items.append(item)
|
||||
else:
|
||||
new_items.append(item)
|
||||
|
||||
# If the source is batched, set the tts on new items to at least the batch tts
|
||||
if "batch" in config:
|
||||
try:
|
||||
batch_adj = int(config["batch"])
|
||||
now = current_time() - batch_adj
|
||||
batch_start = now - (now % 86400)
|
||||
batch_end = batch_start + 86400 + batch_adj
|
||||
for item in new_items:
|
||||
min_tts = batch_end - item["created"]
|
||||
item["tts"] = min(min_tts, item.get("tts", min_tts))
|
||||
except:
|
||||
pass
|
||||
|
||||
# Write all the new items to the source directory.
|
||||
for item in new_items:
|
||||
# TODO: support on-create trigger
|
||||
source.save_item(item)
|
||||
|
||||
# If the source has an on-create trigger, run it for new items
|
||||
if "on_create" in config.get("action", {}):
|
||||
for item in new_items:
|
||||
try:
|
||||
execute_action(source, item["id"], "on_create")
|
||||
except Exception as ex:
|
||||
print(
|
||||
f"on_create failed for {source.source_name}/{item['id']}:\n{ex}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
|
||||
# Update the other items using the fetched items' values.
|
||||
for upd_item in upd_items:
|
||||
old_item = source.get_item(upd_item["id"])
|
||||
|
@ -174,11 +174,13 @@ var doAction = function (source, itemid, action) {
|
||||
|
||||
{# source/id/created footer line #}
|
||||
{% if item.source or item.id or item.created %}
|
||||
<span class="item-info" title="{{ 'Tags: {}'.format(', '.join(item.tags)) }}">
|
||||
<span class="item-info" title="{% if item.tags %}{{ 'Tags: {}'.format(', '.join(item.tags)) }}{% else %}No tags{% endif %}">
|
||||
{% if item.source %}{{item.source}}{% endif %}
|
||||
{% if item.id %}{{item.id}}{% endif %}
|
||||
{% if item.created %}{{item.created|datetimeformat}}{% endif %}
|
||||
{% if item.ttl %}L{% endif %}{% if item.ttd %}D{% endif %}{% if item.tts %}S{% endif %}
|
||||
{% if item.ttl %}<span title="TTL {{item.ttl_at|datetimeformat}}">[L]</span>{% endif %}
|
||||
{% if item.ttd %}<span title="TTD {{item.ttd_at|datetimeformat}}">[D]</span>{% endif %}
|
||||
{% if item.tts %}<span title="TTS {{item.tts_at|datetimeformat}}">[S]</span>{% endif %}
|
||||
</span>
|
||||
{% endif %}
|
||||
|
||||
|
@ -34,6 +34,12 @@ summary:focus {
|
||||
width: 100%;
|
||||
resize: vertical;
|
||||
}
|
||||
.intake-sources td {
|
||||
padding-block: 0.4em;
|
||||
}
|
||||
.intake-sources form {
|
||||
margin: 0
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
@ -46,7 +52,13 @@ summary:focus {
|
||||
<p>No channels found.</p>
|
||||
{% else %}
|
||||
{% for channel in channels %}
|
||||
<p><a href="{{ url_for('channel_feed', name=channel) }}">{{ channel }}</a></p>
|
||||
<p><a href="{{ url_for('channel_feed', name=channel) }}">
|
||||
{%- if channel_counts[channel] -%}
|
||||
({{ channel_counts[channel] }}) {{ channel }}
|
||||
{%- else -%}
|
||||
{{ channel }}
|
||||
{%- endif -%}
|
||||
</a></p>
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
<p><a href="{{ url_for('channels_edit') }}">Edit channels</a></p>
|
||||
@ -59,17 +71,22 @@ summary:focus {
|
||||
{% if not sources %}
|
||||
<p>No sources found.</p>
|
||||
{% else %}
|
||||
<table class="intake-sources">
|
||||
{% for source in sources %}
|
||||
<p>
|
||||
<tr>
|
||||
<td>
|
||||
{%- for channel, srcs in channels|items -%}
|
||||
{%- if source.source_name in srcs -%}
|
||||
^
|
||||
{%- endif -%}
|
||||
{%- endfor -%}
|
||||
<a href="{{ url_for('source_feed', name=source.source_name) }}">{{ source.source_name|safe }}</a>
|
||||
(<a href="{{ url_for('source_edit', name=source.source_name) }}">edit</a>)
|
||||
</p>
|
||||
</td>
|
||||
<td><form id="{{ source.source_name }}"><button type="submit" formmethod=post formaction="{{ url_for('fetch', source_name=source.source_name) }}" />fetch</button></form></td>
|
||||
<td>(<a href="{{ url_for('source_edit', name=source.source_name) }}">edit</a>)</td>
|
||||
<td><a href="{{ url_for('source_feed', name=source.source_name) }}">{{ source.source_name|safe }}</a></td>
|
||||
</tr>
|
||||
{% endfor %}
|
||||
</table>
|
||||
{% endif %}
|
||||
</details>
|
||||
</article>
|
||||
|
244
module.nix
244
module.nix
@ -1,153 +1,179 @@
|
||||
flake: { config, lib, pkgs, ... }:
|
||||
flake:
|
||||
{
|
||||
config,
|
||||
lib,
|
||||
pkgs,
|
||||
...
|
||||
}:
|
||||
|
||||
let
|
||||
inherit (lib) filterAttrs foldl imap1 mapAttrsToList mkEnableOption mkIf mkMerge mkOption mkPackageOption types;
|
||||
inherit (lib)
|
||||
filterAttrs
|
||||
foldl
|
||||
imap1
|
||||
mapAttrsToList
|
||||
mkEnableOption
|
||||
mkIf
|
||||
mkMerge
|
||||
mkOption
|
||||
mkPackageOption
|
||||
types
|
||||
;
|
||||
intakeCfg = config.services.intake;
|
||||
in {
|
||||
in
|
||||
{
|
||||
options = {
|
||||
services.intake = {
|
||||
listen.addr = mkOption {
|
||||
type = types.str;
|
||||
default = "0.0.0.0";
|
||||
description = "The listen address for the entry point to intake services. This endpoint will redirect to a "
|
||||
"local port based on the request's HTTP Basic Auth credentials.";
|
||||
description = "The listen address for the entry point to intake services. This endpoint will redirect to a local port based on the request's HTTP Basic Auth credentials.";
|
||||
};
|
||||
|
||||
listen.port = mkOption {
|
||||
type = types.port;
|
||||
default = 80;
|
||||
description = "The listen port for the entry point to intake services. This endpoint will redirect to a local "
|
||||
"port based on the request's HTTP Basic Auth credentials.";
|
||||
description = "The listen port for the entry point to intake services. This endpoint will redirect to a local port based on the request's HTTP Basic Auth credentials.";
|
||||
};
|
||||
|
||||
package = mkPackageOption pkgs "intake" {};
|
||||
package = mkPackageOption pkgs "intake" { };
|
||||
|
||||
internalPortStart = mkOption {
|
||||
type = types.port;
|
||||
default = 24130;
|
||||
description = "The first port to use for internal service endpoints. A number of ports will be continguously "
|
||||
"allocated equal to the number of users with enabled intake services.";
|
||||
description = "The first port to use for internal service endpoints. A number of ports will be continguously allocated equal to the number of users with enabled intake services.";
|
||||
};
|
||||
|
||||
extraPackages = mkOption {
|
||||
type = types.listOf types.package;
|
||||
default = [];
|
||||
default = [ ];
|
||||
description = "Extra packages available to all enabled users and their intake services.";
|
||||
};
|
||||
|
||||
users = mkOption {
|
||||
description = "User intake service definitions.";
|
||||
default = {};
|
||||
type = types.attrsOf (types.submodule {
|
||||
options = {
|
||||
enable = mkEnableOption "intake, a personal feed aggregator.";
|
||||
default = { };
|
||||
type = types.attrsOf (
|
||||
types.submodule {
|
||||
options = {
|
||||
enable = mkEnableOption "intake, a personal feed aggregator.";
|
||||
|
||||
extraPackages = mkOption {
|
||||
type = types.listOf types.package;
|
||||
default = [];
|
||||
description = "Extra packages available to this user and their intake service.";
|
||||
extraPackages = mkOption {
|
||||
type = types.listOf types.package;
|
||||
default = [ ];
|
||||
description = "Extra packages available to this user and their intake service.";
|
||||
};
|
||||
};
|
||||
};
|
||||
});
|
||||
}
|
||||
);
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
config =
|
||||
let
|
||||
# Define the intake package and a python environment to run it from
|
||||
intake = intakeCfg.package;
|
||||
pythonEnv = pkgs.python3.withPackages (pypkgs: [ intake ]);
|
||||
|
||||
# Assign each user an internal port for their personal intake instance
|
||||
enabledUsers = filterAttrs (userName: userCfg: userCfg.enable) intakeCfg.users;
|
||||
enabledUserNames = mapAttrsToList (userName: userCfg: userName) enabledUsers;
|
||||
userPortList = imap1 (i: userName: { ${userName} = i + intakeCfg.internalPortStart; }) enabledUserNames;
|
||||
userPort = foldl (acc: val: acc // val) {} userPortList;
|
||||
|
||||
# To avoid polluting PATH with httpd programs, define an htpasswd wrapper
|
||||
htpasswdWrapper = pkgs.writeShellScriptBin "htpasswd" ''
|
||||
${pkgs.apacheHttpd}/bin/htpasswd $@
|
||||
'';
|
||||
|
||||
# File locations
|
||||
intakeDir = "/etc/intake";
|
||||
intakePwd = "${intakeDir}/htpasswd";
|
||||
in {
|
||||
# Apply the overlay so intake is included in pkgs.
|
||||
nixpkgs.overlays = [ flake.overlays.default ];
|
||||
|
||||
# Define a user group for access to the htpasswd file. nginx needs to be able to read it.
|
||||
users.groups.intake.members = mkIf (enabledUsers != {}) (enabledUserNames ++ [ "nginx" ]);
|
||||
|
||||
# Define an activation script that ensures that the htpasswd file exists.
|
||||
system.activationScripts.etc-intake = ''
|
||||
if [ ! -e ${intakeDir} ]; then
|
||||
${pkgs.coreutils}/bin/mkdir -p ${intakeDir};
|
||||
fi
|
||||
${pkgs.coreutils}/bin/chown root:root ${intakeDir}
|
||||
${pkgs.coreutils}/bin/chmod 755 ${intakeDir}
|
||||
if [ ! -e ${intakePwd} ]; then
|
||||
${pkgs.coreutils}/bin/touch ${intakePwd}
|
||||
fi
|
||||
${pkgs.coreutils}/bin/chown root:intake ${intakePwd}
|
||||
${pkgs.coreutils}/bin/chmod 660 ${intakePwd}
|
||||
'';
|
||||
|
||||
# Give every intake user the htpasswd wrapper, the shared packages, and the user-specific packages.
|
||||
users.users =
|
||||
let
|
||||
addPackagesToUser = userName: {
|
||||
${userName}.packages =
|
||||
[ htpasswdWrapper intake ]
|
||||
++ intakeCfg.extraPackages
|
||||
++ intakeCfg.users.${userName}.extraPackages;
|
||||
};
|
||||
in mkMerge (map addPackagesToUser enabledUserNames);
|
||||
# Define the intake package and a python environment to run it from
|
||||
intake = intakeCfg.package;
|
||||
pythonEnv = pkgs.python3.withPackages (pypkgs: [ intake ]);
|
||||
|
||||
# Enable cron
|
||||
services.cron.enable = true;
|
||||
# Assign each user an internal port for their personal intake instance
|
||||
enabledUsers = filterAttrs (userName: userCfg: userCfg.enable) intakeCfg.users;
|
||||
enabledUserNames = mapAttrsToList (userName: userCfg: userName) enabledUsers;
|
||||
userPortList = imap1 (i: userName: {
|
||||
${userName} = i + intakeCfg.internalPortStart;
|
||||
}) enabledUserNames;
|
||||
userPort = foldl (acc: val: acc // val) { } userPortList;
|
||||
|
||||
# Define a user service for each configured user
|
||||
systemd.services =
|
||||
let
|
||||
runScript = userName: pkgs.writeShellScript "intake-run.sh" ''
|
||||
# Add the setuid wrapper directory so `crontab` is accessible
|
||||
export PATH="${config.security.wrapperDir}:$PATH"
|
||||
${pythonEnv}/bin/intake run -d /home/${userName}/.local/share/intake --port ${toString userPort.${userName}}
|
||||
# To avoid polluting PATH with httpd programs, define an htpasswd wrapper
|
||||
htpasswdWrapper = pkgs.writeShellScriptBin "htpasswd" ''
|
||||
${pkgs.apacheHttpd}/bin/htpasswd $@
|
||||
'';
|
||||
# systemd service definition for a single user, given `services.intake.users.userName` = `userCfg`
|
||||
userServiceConfig = userName: userCfg: {
|
||||
"intake@${userName}" = {
|
||||
description = "Intake service for user ${userName}";
|
||||
script = "${runScript userName}";
|
||||
path = intakeCfg.extraPackages ++ userCfg.extraPackages;
|
||||
serviceConfig = {
|
||||
User = userName;
|
||||
Type = "simple";
|
||||
};
|
||||
wantedBy = [ "multi-user.target" ];
|
||||
after = [ "network.target" ];
|
||||
enable = userCfg.enable;
|
||||
};
|
||||
};
|
||||
in mkMerge (mapAttrsToList userServiceConfig intakeCfg.users);
|
||||
|
||||
# Define an nginx reverse proxy to request auth
|
||||
services.nginx = mkIf (enabledUsers != {}) {
|
||||
enable = true;
|
||||
virtualHosts."intake" = mkIf (enabledUsers != {}) {
|
||||
listen = [ intakeCfg.listen ];
|
||||
locations."/" = {
|
||||
proxyPass = "http://127.0.0.1:$target_port";
|
||||
basicAuthFile = intakePwd;
|
||||
# File locations
|
||||
intakeDir = "/etc/intake";
|
||||
intakePwd = "${intakeDir}/htpasswd";
|
||||
in
|
||||
{
|
||||
# Apply the overlay so intake is included in pkgs.
|
||||
nixpkgs.overlays = [ flake.overlays.default ];
|
||||
|
||||
# Define a user group for access to the htpasswd file. nginx needs to be able to read it.
|
||||
users.groups.intake.members = mkIf (enabledUsers != { }) (enabledUserNames ++ [ "nginx" ]);
|
||||
|
||||
# Define an activation script that ensures that the htpasswd file exists.
|
||||
system.activationScripts.etc-intake = ''
|
||||
if [ ! -e ${intakeDir} ]; then
|
||||
${pkgs.coreutils}/bin/mkdir -p ${intakeDir};
|
||||
fi
|
||||
${pkgs.coreutils}/bin/chown root:root ${intakeDir}
|
||||
${pkgs.coreutils}/bin/chmod 755 ${intakeDir}
|
||||
if [ ! -e ${intakePwd} ]; then
|
||||
${pkgs.coreutils}/bin/touch ${intakePwd}
|
||||
fi
|
||||
${pkgs.coreutils}/bin/chown root:intake ${intakePwd}
|
||||
${pkgs.coreutils}/bin/chmod 660 ${intakePwd}
|
||||
'';
|
||||
|
||||
# Give every intake user the htpasswd wrapper, the shared packages, and the user-specific packages.
|
||||
users.users =
|
||||
let
|
||||
addPackagesToUser = userName: {
|
||||
${userName}.packages = [
|
||||
htpasswdWrapper
|
||||
intake
|
||||
] ++ intakeCfg.extraPackages ++ intakeCfg.users.${userName}.extraPackages;
|
||||
};
|
||||
in
|
||||
mkMerge (map addPackagesToUser enabledUserNames);
|
||||
|
||||
# Enable cron
|
||||
services.cron.enable = true;
|
||||
|
||||
# Define a user service for each configured user
|
||||
systemd.services =
|
||||
let
|
||||
runScript =
|
||||
userName:
|
||||
pkgs.writeShellScript "intake-run.sh" ''
|
||||
# Add the setuid wrapper directory so `crontab` is accessible
|
||||
export PATH="${config.security.wrapperDir}:$PATH"
|
||||
${pythonEnv}/bin/intake run -d /home/${userName}/.local/share/intake --port ${toString userPort.${userName}}
|
||||
'';
|
||||
# systemd service definition for a single user, given `services.intake.users.userName` = `userCfg`
|
||||
userServiceConfig = userName: userCfg: {
|
||||
"intake@${userName}" = {
|
||||
description = "Intake service for user ${userName}";
|
||||
script = "${runScript userName}";
|
||||
path = intakeCfg.extraPackages ++ userCfg.extraPackages;
|
||||
serviceConfig = {
|
||||
User = userName;
|
||||
Type = "simple";
|
||||
};
|
||||
wantedBy = [ "multi-user.target" ];
|
||||
after = [ "network.target" ];
|
||||
enable = userCfg.enable;
|
||||
};
|
||||
};
|
||||
in
|
||||
mkMerge (mapAttrsToList userServiceConfig intakeCfg.users);
|
||||
|
||||
# Define an nginx reverse proxy to request auth
|
||||
services.nginx = mkIf (enabledUsers != { }) {
|
||||
enable = true;
|
||||
virtualHosts."intake" = mkIf (enabledUsers != { }) {
|
||||
listen = [ intakeCfg.listen ];
|
||||
locations."/" = {
|
||||
proxyPass = "http://127.0.0.1:$target_port";
|
||||
basicAuthFile = intakePwd;
|
||||
};
|
||||
extraConfig = foldl (acc: val: acc + val) "" (
|
||||
mapAttrsToList (userName: port: ''
|
||||
if ($remote_user = "${userName}") {
|
||||
set $target_port ${toString port};
|
||||
}
|
||||
'') userPort
|
||||
);
|
||||
};
|
||||
extraConfig = foldl (acc: val: acc + val) "" (mapAttrsToList (userName: port: ''
|
||||
if ($remote_user = "${userName}") {
|
||||
set $target_port ${toString port};
|
||||
}
|
||||
'') userPort);
|
||||
};
|
||||
};
|
||||
};
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "intake"
|
||||
version = "1.1.0"
|
||||
version = "1.3.1"
|
||||
|
||||
[project.scripts]
|
||||
intake = "intake.cli:main"
|
||||
|
19
shell.nix
19
shell.nix
@ -1,10 +1,9 @@
|
||||
(import
|
||||
(
|
||||
let lock = builtins.fromJSON (builtins.readFile ./flake.lock); in
|
||||
fetchTarball {
|
||||
url = "https://github.com/edolstra/flake-compat/archive/${lock.nodes.flake-compat.locked.rev}.tar.gz";
|
||||
sha256 = lock.nodes.flake-compat.locked.narHash;
|
||||
}
|
||||
)
|
||||
{ src = ./.; }
|
||||
).shellNix
|
||||
(import (
|
||||
let
|
||||
lock = builtins.fromJSON (builtins.readFile ./flake.lock);
|
||||
in
|
||||
fetchTarball {
|
||||
url = "https://github.com/edolstra/flake-compat/archive/${lock.nodes.flake-compat.locked.rev}.tar.gz";
|
||||
sha256 = lock.nodes.flake-compat.locked.narHash;
|
||||
}
|
||||
) { src = ./.; }).shellNix
|
||||
|
@ -2,6 +2,7 @@
|
||||
"demo": [
|
||||
"demo_basic_callback",
|
||||
"demo_logging",
|
||||
"demo_raw_sh"
|
||||
"demo_raw_sh",
|
||||
"demo_oncreate"
|
||||
]
|
||||
}
|
@ -1,5 +1,5 @@
|
||||
from pathlib import Path
|
||||
from typing import List, Callable
|
||||
from typing import Generator, Callable
|
||||
|
||||
import pytest
|
||||
|
||||
@ -14,15 +14,16 @@ def clean_source(source_path: Path):
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def using_source() -> Callable:
|
||||
def using_source() -> Generator[Callable, None, LocalSource]:
|
||||
test_data = Path(__file__).parent
|
||||
sources: List[Path] = []
|
||||
sources: list[Path] = []
|
||||
|
||||
def _using_source(name: str):
|
||||
source_path = test_data / name
|
||||
clean_source(source_path)
|
||||
sources.append(source_path)
|
||||
return LocalSource(test_data, name)
|
||||
|
||||
yield _using_source
|
||||
|
||||
for source_path in sources:
|
||||
|
@ -1,7 +1,9 @@
|
||||
{
|
||||
"action": {
|
||||
"fetch": {
|
||||
"exe": "true"
|
||||
"args": [
|
||||
"true"
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
@ -1,20 +1,20 @@
|
||||
{
|
||||
"action": {
|
||||
"fetch": {
|
||||
"exe": "./increment.py",
|
||||
"args": [
|
||||
"./increment.py",
|
||||
"fetch"
|
||||
]
|
||||
},
|
||||
"increment": {
|
||||
"exe": "./increment.py",
|
||||
"args": [
|
||||
"./increment.py",
|
||||
"increment"
|
||||
]
|
||||
},
|
||||
"decrement": {
|
||||
"exe": "./increment.py",
|
||||
"args": [
|
||||
"./increment.py",
|
||||
"decrement"
|
||||
]
|
||||
}
|
||||
|
@ -1,8 +1,8 @@
|
||||
{
|
||||
"action": {
|
||||
"fetch": {
|
||||
"exe": "python3",
|
||||
"args": [
|
||||
"python3",
|
||||
"update.py"
|
||||
]
|
||||
}
|
||||
|
22
tests/demo_oncreate/intake.json
Normal file
22
tests/demo_oncreate/intake.json
Normal file
@ -0,0 +1,22 @@
|
||||
{
|
||||
"action": {
|
||||
"fetch": {
|
||||
"args": [
|
||||
"./update.py",
|
||||
"fetch"
|
||||
]
|
||||
},
|
||||
"update": {
|
||||
"args": [
|
||||
"./update.py",
|
||||
"update"
|
||||
]
|
||||
},
|
||||
"on_create": {
|
||||
"args": [
|
||||
"./update.py",
|
||||
"update"
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
29
tests/demo_oncreate/update.py
Executable file
29
tests/demo_oncreate/update.py
Executable file
@ -0,0 +1,29 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import argparse, json, sys, time
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("action")
|
||||
args = parser.parse_args()
|
||||
|
||||
print("args:", args, file=sys.stderr, flush=True)
|
||||
|
||||
if args.action == "fetch":
|
||||
print(
|
||||
json.dumps(
|
||||
{
|
||||
"id": str(int(time.time())),
|
||||
"title": "Title has not been updated",
|
||||
"action": {
|
||||
"update": 0,
|
||||
},
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
if args.action == "update" or args.action == "on_create":
|
||||
item = sys.stdin.readline()
|
||||
item = json.loads(item)
|
||||
item["action"]["update"] += 1
|
||||
item["title"] = f"Updated {item['action']['update']} times"
|
||||
print(json.dumps(item))
|
@ -1,11 +1,11 @@
|
||||
{
|
||||
"action": {
|
||||
"fetch": {
|
||||
"exe": "sh",
|
||||
"args": [
|
||||
"sh",
|
||||
"-c",
|
||||
"echo {\\\"id\\\": \\\"$(date +%Y-%m-%d-%H-%M)\\\"}"
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,8 +1,10 @@
|
||||
{
|
||||
"action": {
|
||||
"fetch": {
|
||||
"exe": "./update.py",
|
||||
"args": ["fetch"]
|
||||
"args": [
|
||||
"./update.py",
|
||||
"fetch"
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
@ -1,4 +1,6 @@
|
||||
import json
|
||||
from pathlib import Path
|
||||
import tempfile
|
||||
|
||||
from intake.source import fetch_items, update_items, LocalSource
|
||||
|
||||
@ -8,6 +10,7 @@ def test_default_source(using_source):
|
||||
fetch = fetch_items(source)
|
||||
assert len(fetch) == 0
|
||||
|
||||
|
||||
def test_basic_lifecycle(using_source):
|
||||
source: LocalSource = using_source("test_inbox")
|
||||
state = {"inbox": [{"id": "first"}]}
|
||||
@ -61,3 +64,44 @@ def test_basic_lifecycle(using_source):
|
||||
items = list(source.get_all_items())
|
||||
assert len(items) == 1
|
||||
assert items[0]["id"] == "second"
|
||||
|
||||
|
||||
def test_batch():
|
||||
with tempfile.TemporaryDirectory() as data_dir:
|
||||
root = Path(data_dir)
|
||||
source_dir = root / "batching"
|
||||
source_dir.mkdir()
|
||||
config_file = source_dir / "intake.json"
|
||||
sh_args = [
|
||||
"python",
|
||||
"-c",
|
||||
"import random; print(f'{{\"id\":\"{random.randrange(16**16):016x}\"}}')"
|
||||
]
|
||||
batch_config = {
|
||||
"action": {
|
||||
"fetch": {
|
||||
"args": sh_args
|
||||
}
|
||||
},
|
||||
"batch": 0
|
||||
}
|
||||
config_file.write_text(json.dumps(batch_config))
|
||||
source = LocalSource(root, source_dir.name)
|
||||
|
||||
# batch sets the tts
|
||||
fetch1 = fetch_items(source)
|
||||
assert len(fetch1) == 1
|
||||
update_items(source, fetch1)
|
||||
item1 = source.get_item(fetch1[0]["id"])
|
||||
assert "tts" in item1._item
|
||||
|
||||
batch_config["batch"] = 3600
|
||||
config_file.write_text(json.dumps(batch_config))
|
||||
|
||||
fetch2 = fetch_items(source)
|
||||
assert len(fetch2) == 1
|
||||
update_items(source, fetch2)
|
||||
item2 = source.get_item(fetch2[0]["id"])
|
||||
assert "tts" in item2._item
|
||||
assert item1["id"] != item2["id"]
|
||||
assert item2.tts_at == item1.tts_at + 3600
|
||||
|
Loading…
Reference in New Issue
Block a user