Compare commits

..

22 Commits

Author SHA1 Message Date
Tim Van Baak bbe3f53cd4 Bump version to 1.3.1 2024-11-06 21:29:44 -08:00
Tim Van Baak 718220f889 Fix crontab on systems with crontab 2024-11-06 21:27:48 -08:00
Tim Van Baak 8337b74a80 Bump version to 1.3.0 2024-11-06 20:50:27 -08:00
Tim Van Baak 527e37003c Update tag title text 2024-11-06 20:49:54 -08:00
Tim Van Baak ec33495c56 Add source-level item batching 2024-11-06 20:49:32 -08:00
Tim Van Baak d0780a9fd1 Update type annotations 2024-11-06 19:16:21 -08:00
Tim Van Baak c903947f00 Expose pythonEnv from flake 2024-11-06 19:03:19 -08:00
Tim Van Baak e72f9aa349 Consolidate intake.core into intake.source
No point having a submodule for one function
2024-11-06 18:53:35 -08:00
Tim Van Baak 381de535f7 Fold action.exe into action.args 2024-11-06 18:39:44 -08:00
Tim Van Baak 9bb331941f Fix crontab on systems without crontab 2024-11-06 18:32:11 -08:00
Tim Van Baak 256a52e902 Provide link back to source in fetch result 2024-09-21 15:07:05 -07:00
Tim Van Baak 5cf7e8ef48 Update docs and version 2024-09-21 14:58:21 -07:00
Tim Van Baak 008c0c8230 Add `make lint` 2024-09-21 14:58:09 -07:00
Tim Van Baak 0c4177b783 Add on_create trigger 2024-09-21 14:58:00 -07:00
Tim Van Baak c21f55da8d Add script to reset test sources 2024-09-21 14:41:57 -07:00
Tim Van Baak 4abb281715 Include item titles in fetch response 2024-09-21 14:33:38 -07:00
Tim Van Baak 92a10c5ca8 Add a fetch button to source list 2024-09-21 14:13:00 -07:00
Tim Van Baak 4947ce56c6 Change source list to a table 2024-09-21 13:28:16 -07:00
Tim Van Baak 40ad075e96 Add unread count to channel links 2024-09-21 13:13:17 -07:00
Tim Van Baak 887a75bb29 Add Makefile
I don't remember if it's normal for `pytest` to fail to find the module and `python -m pytest` to succeed, but it seems like something to write down, and a Makefile is the place
2024-09-21 12:54:36 -07:00
Tim Van Baak a5754e7023 Apply nixfmt 2024-09-21 12:51:24 -07:00
Tim Van Baak 75c13a21dc Add nixfmt 2024-09-21 12:48:24 -07:00
26 changed files with 553 additions and 281 deletions

2
.gitignore vendored
View File

@ -159,6 +159,8 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder. # option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/ #.idea/
venv
# nixos-shell # nixos-shell
nixos.qcow2 nixos.qcow2

18
Makefile Normal file
View File

@ -0,0 +1,18 @@
.PHONY: *
help: ## display this help
@awk 'BEGIN{FS = ":.*##"; printf "\033[1m\nUsage\n \033[1;92m make\033[0;36m <target>\033[0m\n"} /^[a-zA-Z0-9_-]+:.*?##/ { printf " \033[36m%-15s\033[0m %s\n", $$1, $$2 } ' $(MAKEFILE_LIST)
venv: ## link the python dev environment to ./venv
nix build .#pythonEnv -o venv
lint: ## run linters
nix fmt
black **/*.py
test: ## run pytest
python -m pytest
reset: ## delete all items in tests/ and update each source once
rm -v tests/*/*.item
find tests -name intake.json -printf "%h\n" | cut -c7- | xargs -n 1 python -m intake update -d tests -s

View File

@ -31,32 +31,35 @@ intake
{ {
"action": { "action": {
"fetch": { "fetch": {
"exe": "<absolute path to program or name on intake's PATH>", "args": ["program name", "and", "list", "of", "arguments"]
"args": ["list", "of", "program", "arguments"]
}, },
"<action name>": { "<action name>": {
"exe": "...",
"args": "..." "args": "..."
} }
}, },
"env": { "env": {
"...": "..." "...": "..."
}, },
"cron": "* * * * *" "cron": "* * * * *",
"batch": "<number>"
} }
``` ```
Each key under `action` defines an action that can be taken for the source. An action must contain `exe` and may contain `args`. A source must have a `fetch` action. Each key under `action` defines an action that can be taken for the source. A source must at least have a `fetch` action. If an action named `on_create` is defined for the source, it is executed once for an item when that item is created, that is, the first time the item is returned from the source.
Each key under `env` defines an environment variable that will be set when actions are executed. Each key under `env` defines an environment variable that will be set when `fetch` or other actions are executed.
If `cron` is present, it must define a crontab schedule. Intake will automatically create crontab entries to update each source according to its cron schedule. If `cron` is present, it must define a crontab schedule. Intake will automatically create crontab entries to update each source according to its cron schedule.
`batch` may be a number or string of a number. If it is present, items created by the source will be batched via `tts` so that all items created in a single 24-hour window become visible at the same time. Items created with a longer `tts` will keep their `tts`.
The batch window is computed from midnight to midnight UTC, offset by the value of `batch` (in seconds).
## Interface for source programs ## Interface for source programs
Intake interacts with sources by executing the actions defined in the source's `intake.json`. The `fetch` action is required and used to check for new feed items when `intake update` is executed. Intake interacts with sources by executing the actions defined in the source's `intake.json`. The `fetch` action is required and used to check for new feed items when `intake update` is executed.
To execute an action, intake executes the `exe` program for the action with the corresponding `args` (if present) as arguments. The process's working directory is set to the source's folder, i.e. the folder containing `intake.json`. The process's environment is as follows: To execute an action, intake executes the command given by `args`. The process's working directory is set to the source's folder, i.e. the folder containing `intake.json`. The process's environment is as follows:
* intake's environment is inherited. * intake's environment is inherited.
* `STATE_PATH` is set to the absolute path of `state`. * `STATE_PATH` is set to the absolute path of `state`.

View File

@ -1,10 +1,9 @@
(import (import (
( let
let lock = builtins.fromJSON (builtins.readFile ./flake.lock); in lock = builtins.fromJSON (builtins.readFile ./flake.lock);
fetchTarball { in
url = "https://github.com/edolstra/flake-compat/archive/${lock.nodes.flake-compat.locked.rev}.tar.gz"; fetchTarball {
sha256 = lock.nodes.flake-compat.locked.narHash; url = "https://github.com/edolstra/flake-compat/archive/${lock.nodes.flake-compat.locked.rev}.tar.gz";
} sha256 = lock.nodes.flake-compat.locked.narHash;
) }
{ src = ./.; } ) { src = ./.; }).defaultNix
).defaultNix

View File

@ -14,7 +14,7 @@
users.users.bob = { users.users.bob = {
isNormalUser = true; isNormalUser = true;
password = "beta"; password = "beta";
uid = 1001; uid = 1001;
packages = [ pkgs.intake ]; packages = [ pkgs.intake ];
}; };
@ -26,11 +26,13 @@
}; };
# Expose the vm's intake revproxy at host port 5234 # Expose the vm's intake revproxy at host port 5234
virtualisation.forwardPorts = [{ virtualisation.forwardPorts = [
from = "host"; {
host.port = 5234; from = "host";
guest.port = 8080; host.port = 5234;
}]; guest.port = 8080;
}
];
# Mount the demo content for both users # Mount the demo content for both users
nixos-shell.mounts = { nixos-shell.mounts = {
@ -48,19 +50,19 @@
# Create an activation script that copies and chowns the demo content # Create an activation script that copies and chowns the demo content
# chmod 777 because the users may not exist when the activation script runs # chmod 777 because the users may not exist when the activation script runs
system.activationScripts = system.activationScripts =
let let
userSetup = name: uid: '' userSetup = name: uid: ''
${pkgs.coreutils}/bin/mkdir -p /home/${name}/.local/share/intake ${pkgs.coreutils}/bin/mkdir -p /home/${name}/.local/share/intake
${pkgs.coreutils}/bin/cp -r /mnt/${name}/* /home/${name}/.local/share/intake/ ${pkgs.coreutils}/bin/cp -r /mnt/${name}/* /home/${name}/.local/share/intake/
${pkgs.coreutils}/bin/chown -R ${uid} /home/${name} ${pkgs.coreutils}/bin/chown -R ${uid} /home/${name}
${pkgs.findutils}/bin/find /home/${name} -type d -exec ${pkgs.coreutils}/bin/chmod 755 {} \; ${pkgs.findutils}/bin/find /home/${name} -type d -exec ${pkgs.coreutils}/bin/chmod 755 {} \;
${pkgs.findutils}/bin/find /home/${name} -type f -exec ${pkgs.coreutils}/bin/chmod 644 {} \; ${pkgs.findutils}/bin/find /home/${name} -type f -exec ${pkgs.coreutils}/bin/chmod 644 {} \;
''; '';
in in
{ {
aliceSetup = userSetup "alice" "1000"; aliceSetup = userSetup "alice" "1000";
bobSetup = userSetup "bob" "1001"; bobSetup = userSetup "bob" "1001";
}; };
# Put the demo sources on the global PATH # Put the demo sources on the global PATH
environment.variables.PATH = "/mnt/sources"; environment.variables.PATH = "/mnt/sources";

135
flake.nix
View File

@ -13,61 +13,90 @@
nixos-shell.inputs.nixpkgs.follows = "nixpkgs"; nixos-shell.inputs.nixpkgs.follows = "nixpkgs";
}; };
outputs = { self, nixpkgs, flake-compat, nixos-shell }: outputs =
let {
inherit (nixpkgs.lib) makeOverridable nixosSystem; self,
system = "x86_64-linux"; nixpkgs,
in { flake-compat,
packages.${system} = let nixos-shell,
pkgs = (import nixpkgs { }:
let
inherit (nixpkgs.lib) makeOverridable nixosSystem;
system = "x86_64-linux";
pkgs = nixpkgs.legacyPackages.${system};
pythonEnv = pkgs.python3.withPackages (
pypkgs: with pypkgs; [
flask
black
pytest
]
);
in
{
formatter.${system} = nixpkgs.legacyPackages.${system}.nixfmt-rfc-style;
packages.${system} =
let
pkgs = (
import nixpkgs {
inherit system;
overlays = [ self.overlays.default ];
}
);
in
{
default = self.packages.${system}.intake;
inherit (pkgs) intake;
inherit pythonEnv;
};
devShells.${system} = {
default = pkgs.mkShell {
packages = [
pythonEnv
pkgs.gnumake
pkgs.nixos-shell
# We only take this dependency for htpasswd, which is a little unfortunate
pkgs.apacheHttpd
];
shellHook = ''
PS1="(develop) $PS1"
'';
};
};
overlays.default = final: prev: {
intake = final.python3Packages.buildPythonPackage {
name = "intake";
src = builtins.path {
path = ./.;
name = "intake";
};
format = "pyproject";
propagatedBuildInputs = with final.python3Packages; [
flask
setuptools
];
};
};
templates.source = {
path = builtins.path {
path = ./template;
name = "source";
};
description = "A basic intake source config";
};
nixosModules.default = import ./module.nix self;
nixosConfigurations."demo" = makeOverridable nixosSystem {
inherit system; inherit system;
overlays = [ self.overlays.default ]; modules = [
}); nixos-shell.nixosModules.nixos-shell
in { self.nixosModules.default
default = self.packages.${system}.intake; ./demo
inherit (pkgs) intake;
};
devShells.${system} = {
default = let
pkgs = nixpkgs.legacyPackages.${system};
pythonEnv = pkgs.python3.withPackages (pypkgs: with pypkgs; [ flask black pytest ]);
in pkgs.mkShell {
packages = [
pythonEnv
pkgs.nixos-shell
# We only take this dependency for htpasswd, which is a little unfortunate
pkgs.apacheHttpd
]; ];
shellHook = ''
PS1="(develop) $PS1"
'';
}; };
}; };
overlays.default = final: prev: {
intake = final.python3Packages.buildPythonPackage {
name = "intake";
src = builtins.path { path = ./.; name = "intake"; };
format = "pyproject";
propagatedBuildInputs = with final.python3Packages; [ flask setuptools ];
};
};
templates.source = {
path = builtins.path { path = ./template; name = "source"; };
description = "A basic intake source config";
};
nixosModules.default = import ./module.nix self;
nixosConfigurations."demo" = makeOverridable nixosSystem {
inherit system;
modules = [
nixos-shell.nixosModules.nixos-shell
self.nixosModules.default
./demo
];
};
};
} }

View File

@ -18,9 +18,16 @@ from flask import (
current_app, current_app,
) )
from intake.core import intake_data_dir
from intake.crontab import update_crontab_entries from intake.crontab import update_crontab_entries
from intake.source import LocalSource, execute_action, Item from intake.source import (
LocalSource,
execute_action,
Item,
fetch_items,
update_items,
intake_data_dir,
)
from intake.types import InvalidConfigException, SourceUpdateException
# Globals # Globals
app = Flask(__name__) app = Flask(__name__)
@ -115,10 +122,23 @@ def root():
if channels_config_path.exists(): if channels_config_path.exists():
channels = json.loads(channels_config_path.read_text(encoding="utf8")) channels = json.loads(channels_config_path.read_text(encoding="utf8"))
channel_counts = {
channel: len(
[
item
for source in sources
for item in LocalSource(data_path, source).get_all_items()
if not item.is_hidden
]
)
for channel, sources in channels.items()
}
return render_template( return render_template(
"home.jinja2", "home.jinja2",
sources=sources, sources=sources,
channels=channels, channels=channels,
channel_counts=channel_counts,
) )
@ -306,13 +326,15 @@ def _parse_source_config(config_str: str):
if "fetch" not in action: if "fetch" not in action:
return ("No fetch action defined", {}) return ("No fetch action defined", {})
fetch = action["fetch"] fetch = action["fetch"]
if "exe" not in fetch: if "exe" not in fetch and not fetch.get("args"):
return ("No fetch exe", {}) return ("No fetch exe", {})
config = {"action": parsed["action"]} config = {"action": parsed["action"]}
if "env" in parsed: if "env" in parsed:
config["env"] = parsed["env"] config["env"] = parsed["env"]
if "cron" in parsed: if "cron" in parsed:
config["cron"] = parsed["cron"] config["cron"] = parsed["cron"]
if "batch" in parsed:
config["batch"] = parsed["batch"]
return (None, config) return (None, config)
@ -368,6 +390,24 @@ def _parse_channels_config(config_str: str):
return (None, parsed) return (None, parsed)
@app.post("/fetch/<string:source_name>")
@auth_check
def fetch(source_name: str):
data_path: Path = current_app.config["INTAKE_DATA"]
source = LocalSource(data_path, source_name)
try:
items = fetch_items(source)
update_items(source, items)
titles = "\n".join(f"<li>{item.display_title}</li>" for item in items)
source_url = url_for("source_feed", name=source_name)
return f'Update returned {len(items)} items:<ul>{titles}</ul><p><a href="{source_url}">{source_name}</a></p>'
except InvalidConfigException as ex:
abort(500, f"Could not fetch {source_name}:\n{ex}")
except SourceUpdateException as ex:
abort(500, f"Error updating source {source_name}:\n{ex}")
@app.post("/add") @app.post("/add")
@auth_check @auth_check
def add_item(): def add_item():
@ -379,7 +419,7 @@ def add_item():
config_path = source_path / "intake.json" config_path = source_path / "intake.json"
if not config_path.exists(): if not config_path.exists():
config_path.write_text( config_path.write_text(
json.dumps({"action": {"fetch": {"exe": "true"}}}, indent=2) json.dumps({"action": {"fetch": {"args": ["true"]}}}, indent=2)
) )
source = LocalSource(source_path.parent, source_path.name) source = LocalSource(source_path.parent, source_path.name)

View File

@ -10,9 +10,14 @@ import pwd
import subprocess import subprocess
import sys import sys
from intake.core import intake_data_dir
from intake.crontab import update_crontab_entries from intake.crontab import update_crontab_entries
from intake.source import fetch_items, LocalSource, update_items, execute_action from intake.source import (
fetch_items,
LocalSource,
update_items,
execute_action,
intake_data_dir,
)
from intake.types import InvalidConfigException, SourceUpdateException from intake.types import InvalidConfigException, SourceUpdateException
@ -52,7 +57,6 @@ def cmd_edit(cmd_args):
{ {
"action": { "action": {
"fetch": { "fetch": {
"exe": "",
"args": [], "args": [],
}, },
}, },

View File

@ -1,12 +0,0 @@
from pathlib import Path
import os
def intake_data_dir() -> Path:
if intake_data := os.environ.get("INTAKE_DATA"):
return Path(intake_data)
if xdg_data_home := os.environ.get("XDG_DATA_HOME"):
return Path(xdg_data_home) / "intake"
if home := os.environ.get("HOME"):
return Path(home) / ".local" / "share" / "intake"
raise Exception("No intake data directory defined")

View File

@ -1,5 +1,6 @@
from pathlib import Path from pathlib import Path
import os import os
import shutil
import subprocess import subprocess
import sys import sys
@ -28,15 +29,13 @@ def update_crontab_entries(data_path: Path):
Update the intake-managed section of the user's crontab. Update the intake-managed section of the user's crontab.
""" """
# If there is no crontab command available, quit early. # If there is no crontab command available, quit early.
cmd = ("command", "-v", "crontab") crontab = shutil.which("crontab")
print("Executing", *cmd, file=sys.stderr) if not crontab:
crontab_exists = subprocess.run(cmd, shell=True) print("No crontab found, skipping", file=sys.stderr)
if crontab_exists.returncode:
print("Could not update crontab", file=sys.stderr)
return return
# Get the current crontab # Get the current crontab
cmd = ["crontab", "-e"] cmd = [crontab, "-e"]
print("Executing", *cmd, file=sys.stderr) print("Executing", *cmd, file=sys.stderr)
get_crontab = subprocess.run( get_crontab = subprocess.run(
cmd, cmd,
@ -77,7 +76,7 @@ def update_crontab_entries(data_path: Path):
print("Updating", len(new_crontab_lines) - 2, "crontab entries", file=sys.stderr) print("Updating", len(new_crontab_lines) - 2, "crontab entries", file=sys.stderr)
# Save the updated crontab # Save the updated crontab
cmd = ["crontab", "-"] cmd = [crontab, "-"]
print("Executing", *cmd, file=sys.stderr) print("Executing", *cmd, file=sys.stderr)
new_crontab: bytes = "\n".join(new_crontab_lines).encode("utf8") new_crontab: bytes = "\n".join(new_crontab_lines).encode("utf8")
save_crontab = subprocess.Popen( save_crontab = subprocess.Popen(

View File

@ -3,7 +3,7 @@ from pathlib import Path
from subprocess import Popen, PIPE, TimeoutExpired from subprocess import Popen, PIPE, TimeoutExpired
from threading import Thread from threading import Thread
from time import time as current_time from time import time as current_time
from typing import List from typing import Generator
import json import json
import os import os
import os.path import os.path
@ -12,6 +12,16 @@ import sys
from intake.types import InvalidConfigException, SourceUpdateException from intake.types import InvalidConfigException, SourceUpdateException
def intake_data_dir() -> Path:
if intake_data := os.environ.get("INTAKE_DATA"):
return Path(intake_data)
if xdg_data_home := os.environ.get("XDG_DATA_HOME"):
return Path(xdg_data_home) / "intake"
if home := os.environ.get("HOME"):
return Path(home) / ".local" / "share" / "intake"
raise Exception("No intake data directory defined")
class Item: class Item:
""" """
A wrapper for an item object. A wrapper for an item object.
@ -71,26 +81,31 @@ class Item:
# The time-to-live fields protects an item from removal until expiry. # The time-to-live fields protects an item from removal until expiry.
# This is mainly used to avoid old items resurfacing when their source # This is mainly used to avoid old items resurfacing when their source
# cannot guarantee monotonocity. # cannot guarantee monotonocity.
if "ttl" in self._item: if "ttl" in self._item and self.ttl_at > current_time():
ttl_date = self._item["created"] + self._item["ttl"] return False
if ttl_date > current_time():
return False
# The time-to-die field puts a maximum lifespan on an item, removing it # The time-to-die field puts a maximum lifespan on an item, removing it
# even if it is active. # even if it is active.
if "ttd" in self._item: if "ttd" in self._item and self.ttd_at < current_time():
ttd_date = self._item["created"] + self._item["ttd"] return True
if ttd_date < current_time():
return True
return not self._item["active"] return not self._item["active"]
@property
def tts_at(self):
return self._item["created"] + self._item.get("tts", 0)
@property
def ttl_at(self):
return self._item["created"] + self._item.get("ttl", 0)
@property
def ttd_at(self):
return self._item["created"] + self._item.get("ttd", 0)
@property @property
def before_tts(self): def before_tts(self):
return ( return "tts" in self._item and current_time() < self.tts_at
"tts" in self._item
and current_time() < self._item["created"] + self._item["tts"]
)
@property @property
def is_hidden(self): def is_hidden(self):
@ -158,7 +173,7 @@ class LocalSource:
def get_item_path(self, item_id: dict) -> Path: def get_item_path(self, item_id: dict) -> Path:
return self.source_path / f"{item_id}.item" return self.source_path / f"{item_id}.item"
def get_item_ids(self) -> List[str]: def get_item_ids(self) -> list[str]:
return [ return [
filepath.name[:-5] filepath.name[:-5]
for filepath in self.source_path.iterdir() for filepath in self.source_path.iterdir()
@ -183,7 +198,7 @@ class LocalSource:
def delete_item(self, item_id) -> None: def delete_item(self, item_id) -> None:
os.remove(self.get_item_path(item_id)) os.remove(self.get_item_path(item_id))
def get_all_items(self) -> List[Item]: def get_all_items(self) -> Generator[Item, None, None]:
for filepath in self.source_path.iterdir(): for filepath in self.source_path.iterdir():
if filepath.name.endswith(".item"): if filepath.name.endswith(".item"):
yield Item(self, json.loads(filepath.read_text(encoding="utf8"))) yield Item(self, json.loads(filepath.read_text(encoding="utf8")))
@ -218,7 +233,7 @@ def _read_stderr(process: Popen) -> None:
def _execute_source_action( def _execute_source_action(
source: LocalSource, action: str, input: str, timeout: timedelta source: LocalSource, action: str, input: str, timeout: timedelta
) -> List[str]: ) -> list[str]:
""" """
Execute the action from a given source. If stdin is specified, pass it Execute the action from a given source. If stdin is specified, pass it
along to the process. Returns lines from stdout. along to the process. Returns lines from stdout.
@ -229,10 +244,13 @@ def _execute_source_action(
if not action_cfg: if not action_cfg:
raise InvalidConfigException(f"No such action {action}") raise InvalidConfigException(f"No such action {action}")
if "exe" not in action_cfg:
exe = [action_cfg["exe"]] if "exe" in action_cfg else []
command = exe + action_cfg.get("args", [])
if not command:
raise InvalidConfigException(f"No exe for action {action}") raise InvalidConfigException(f"No exe for action {action}")
command = [action_cfg["exe"], *action_cfg.get("args", [])]
config_env = {key: str(value) for key, value in config.get("env", {}).items()} config_env = {key: str(value) for key, value in config.get("env", {}).items()}
env = { env = {
**os.environ.copy(), **os.environ.copy(),
@ -283,13 +301,13 @@ def _execute_source_action(
return output return output
def fetch_items(source: LocalSource, timeout: int = 60) -> List[Item]: def fetch_items(source: LocalSource, timeout: int = 60) -> list[Item]:
""" """
Execute the feed source and return the current feed items. Execute the feed source and return the current feed items.
Returns a list of feed items on success. Returns a list of feed items on success.
Throws SourceUpdateException if the feed source update failed. Throws SourceUpdateException if the feed source update failed.
""" """
items: List[Item] = [] items: list[Item] = []
output = _execute_source_action(source, "fetch", None, timedelta(timeout)) output = _execute_source_action(source, "fetch", None, timedelta(timeout))
@ -325,29 +343,54 @@ def execute_action(
raise SourceUpdateException("invalid json") raise SourceUpdateException("invalid json")
def update_items(source: LocalSource, fetched_items: List[Item]): def update_items(source: LocalSource, fetched_items: list[Item]):
""" """
Update the source with a batch of new items, doing creations, updates, and Update the source with a batch of new items, doing creations, updates, and
deletions as necessary. deletions as necessary.
""" """
config = source.get_config()
# Get a list of item ids that already existed for this source. # Get a list of item ids that already existed for this source.
prior_ids = source.get_item_ids() prior_ids = source.get_item_ids()
print(f"Found {len(prior_ids)} prior items", file=sys.stderr) print(f"Found {len(prior_ids)} prior items", file=sys.stderr)
# Determine which items are new and which are updates. # Determine which items are new and which are updates.
new_items: List[Item] = [] new_items: list[Item] = []
upd_items: List[Item] = [] upd_items: list[Item] = []
for item in fetched_items: for item in fetched_items:
if source.item_exists(item["id"]): if source.item_exists(item["id"]):
upd_items.append(item) upd_items.append(item)
else: else:
new_items.append(item) new_items.append(item)
# If the source is batched, set the tts on new items to at least the batch tts
if "batch" in config:
try:
batch_adj = int(config["batch"])
now = current_time() - batch_adj
batch_start = now - (now % 86400)
batch_end = batch_start + 86400 + batch_adj
for item in new_items:
min_tts = batch_end - item["created"]
item["tts"] = min(min_tts, item.get("tts", min_tts))
except:
pass
# Write all the new items to the source directory. # Write all the new items to the source directory.
for item in new_items: for item in new_items:
# TODO: support on-create trigger
source.save_item(item) source.save_item(item)
# If the source has an on-create trigger, run it for new items
if "on_create" in config.get("action", {}):
for item in new_items:
try:
execute_action(source, item["id"], "on_create")
except Exception as ex:
print(
f"on_create failed for {source.source_name}/{item['id']}:\n{ex}",
file=sys.stderr,
)
# Update the other items using the fetched items' values. # Update the other items using the fetched items' values.
for upd_item in upd_items: for upd_item in upd_items:
old_item = source.get_item(upd_item["id"]) old_item = source.get_item(upd_item["id"])

View File

@ -174,11 +174,13 @@ var doAction = function (source, itemid, action) {
{# source/id/created footer line #} {# source/id/created footer line #}
{% if item.source or item.id or item.created %} {% if item.source or item.id or item.created %}
<span class="item-info" title="{{ 'Tags: {}'.format(', '.join(item.tags)) }}"> <span class="item-info" title="{% if item.tags %}{{ 'Tags: {}'.format(', '.join(item.tags)) }}{% else %}No tags{% endif %}">
{% if item.source %}{{item.source}}{% endif %} {% if item.source %}{{item.source}}{% endif %}
{% if item.id %}{{item.id}}{% endif %} {% if item.id %}{{item.id}}{% endif %}
{% if item.created %}{{item.created|datetimeformat}}{% endif %} {% if item.created %}{{item.created|datetimeformat}}{% endif %}
{% if item.ttl %}L{% endif %}{% if item.ttd %}D{% endif %}{% if item.tts %}S{% endif %} {% if item.ttl %}<span title="TTL {{item.ttl_at|datetimeformat}}">[L]</span>{% endif %}
{% if item.ttd %}<span title="TTD {{item.ttd_at|datetimeformat}}">[D]</span>{% endif %}
{% if item.tts %}<span title="TTS {{item.tts_at|datetimeformat}}">[S]</span>{% endif %}
</span> </span>
{% endif %} {% endif %}

View File

@ -34,6 +34,12 @@ summary:focus {
width: 100%; width: 100%;
resize: vertical; resize: vertical;
} }
.intake-sources td {
padding-block: 0.4em;
}
.intake-sources form {
margin: 0
}
</style> </style>
</head> </head>
<body> <body>
@ -46,7 +52,13 @@ summary:focus {
<p>No channels found.</p> <p>No channels found.</p>
{% else %} {% else %}
{% for channel in channels %} {% for channel in channels %}
<p><a href="{{ url_for('channel_feed', name=channel) }}">{{ channel }}</a></p> <p><a href="{{ url_for('channel_feed', name=channel) }}">
{%- if channel_counts[channel] -%}
({{ channel_counts[channel] }}) {{ channel }}
{%- else -%}
{{ channel }}
{%- endif -%}
</a></p>
{% endfor %} {% endfor %}
{% endif %} {% endif %}
<p><a href="{{ url_for('channels_edit') }}">Edit channels</a></p> <p><a href="{{ url_for('channels_edit') }}">Edit channels</a></p>
@ -59,17 +71,22 @@ summary:focus {
{% if not sources %} {% if not sources %}
<p>No sources found.</p> <p>No sources found.</p>
{% else %} {% else %}
<table class="intake-sources">
{% for source in sources %} {% for source in sources %}
<p> <tr>
<td>
{%- for channel, srcs in channels|items -%} {%- for channel, srcs in channels|items -%}
{%- if source.source_name in srcs -%} {%- if source.source_name in srcs -%}
^ ^
{%- endif -%} {%- endif -%}
{%- endfor -%} {%- endfor -%}
<a href="{{ url_for('source_feed', name=source.source_name) }}">{{ source.source_name|safe }}</a> </td>
(<a href="{{ url_for('source_edit', name=source.source_name) }}">edit</a>) <td><form id="{{ source.source_name }}"><button type="submit" formmethod=post formaction="{{ url_for('fetch', source_name=source.source_name) }}" />fetch</button></form></td>
</p> <td>(<a href="{{ url_for('source_edit', name=source.source_name) }}">edit</a>)</td>
<td><a href="{{ url_for('source_feed', name=source.source_name) }}">{{ source.source_name|safe }}</a></td>
</tr>
{% endfor %} {% endfor %}
</table>
{% endif %} {% endif %}
</details> </details>
</article> </article>

View File

@ -1,153 +1,179 @@
flake: { config, lib, pkgs, ... }: flake:
{
config,
lib,
pkgs,
...
}:
let let
inherit (lib) filterAttrs foldl imap1 mapAttrsToList mkEnableOption mkIf mkMerge mkOption mkPackageOption types; inherit (lib)
filterAttrs
foldl
imap1
mapAttrsToList
mkEnableOption
mkIf
mkMerge
mkOption
mkPackageOption
types
;
intakeCfg = config.services.intake; intakeCfg = config.services.intake;
in { in
{
options = { options = {
services.intake = { services.intake = {
listen.addr = mkOption { listen.addr = mkOption {
type = types.str; type = types.str;
default = "0.0.0.0"; default = "0.0.0.0";
description = "The listen address for the entry point to intake services. This endpoint will redirect to a " description = "The listen address for the entry point to intake services. This endpoint will redirect to a local port based on the request's HTTP Basic Auth credentials.";
"local port based on the request's HTTP Basic Auth credentials.";
}; };
listen.port = mkOption { listen.port = mkOption {
type = types.port; type = types.port;
default = 80; default = 80;
description = "The listen port for the entry point to intake services. This endpoint will redirect to a local " description = "The listen port for the entry point to intake services. This endpoint will redirect to a local port based on the request's HTTP Basic Auth credentials.";
"port based on the request's HTTP Basic Auth credentials.";
}; };
package = mkPackageOption pkgs "intake" {}; package = mkPackageOption pkgs "intake" { };
internalPortStart = mkOption { internalPortStart = mkOption {
type = types.port; type = types.port;
default = 24130; default = 24130;
description = "The first port to use for internal service endpoints. A number of ports will be continguously " description = "The first port to use for internal service endpoints. A number of ports will be continguously allocated equal to the number of users with enabled intake services.";
"allocated equal to the number of users with enabled intake services.";
}; };
extraPackages = mkOption { extraPackages = mkOption {
type = types.listOf types.package; type = types.listOf types.package;
default = []; default = [ ];
description = "Extra packages available to all enabled users and their intake services."; description = "Extra packages available to all enabled users and their intake services.";
}; };
users = mkOption { users = mkOption {
description = "User intake service definitions."; description = "User intake service definitions.";
default = {}; default = { };
type = types.attrsOf (types.submodule { type = types.attrsOf (
options = { types.submodule {
enable = mkEnableOption "intake, a personal feed aggregator."; options = {
enable = mkEnableOption "intake, a personal feed aggregator.";
extraPackages = mkOption { extraPackages = mkOption {
type = types.listOf types.package; type = types.listOf types.package;
default = []; default = [ ];
description = "Extra packages available to this user and their intake service."; description = "Extra packages available to this user and their intake service.";
};
}; };
}; }
}); );
}; };
}; };
}; };
config = config =
let
# Define the intake package and a python environment to run it from
intake = intakeCfg.package;
pythonEnv = pkgs.python3.withPackages (pypkgs: [ intake ]);
# Assign each user an internal port for their personal intake instance
enabledUsers = filterAttrs (userName: userCfg: userCfg.enable) intakeCfg.users;
enabledUserNames = mapAttrsToList (userName: userCfg: userName) enabledUsers;
userPortList = imap1 (i: userName: { ${userName} = i + intakeCfg.internalPortStart; }) enabledUserNames;
userPort = foldl (acc: val: acc // val) {} userPortList;
# To avoid polluting PATH with httpd programs, define an htpasswd wrapper
htpasswdWrapper = pkgs.writeShellScriptBin "htpasswd" ''
${pkgs.apacheHttpd}/bin/htpasswd $@
'';
# File locations
intakeDir = "/etc/intake";
intakePwd = "${intakeDir}/htpasswd";
in {
# Apply the overlay so intake is included in pkgs.
nixpkgs.overlays = [ flake.overlays.default ];
# Define a user group for access to the htpasswd file. nginx needs to be able to read it.
users.groups.intake.members = mkIf (enabledUsers != {}) (enabledUserNames ++ [ "nginx" ]);
# Define an activation script that ensures that the htpasswd file exists.
system.activationScripts.etc-intake = ''
if [ ! -e ${intakeDir} ]; then
${pkgs.coreutils}/bin/mkdir -p ${intakeDir};
fi
${pkgs.coreutils}/bin/chown root:root ${intakeDir}
${pkgs.coreutils}/bin/chmod 755 ${intakeDir}
if [ ! -e ${intakePwd} ]; then
${pkgs.coreutils}/bin/touch ${intakePwd}
fi
${pkgs.coreutils}/bin/chown root:intake ${intakePwd}
${pkgs.coreutils}/bin/chmod 660 ${intakePwd}
'';
# Give every intake user the htpasswd wrapper, the shared packages, and the user-specific packages.
users.users =
let let
addPackagesToUser = userName: { # Define the intake package and a python environment to run it from
${userName}.packages = intake = intakeCfg.package;
[ htpasswdWrapper intake ] pythonEnv = pkgs.python3.withPackages (pypkgs: [ intake ]);
++ intakeCfg.extraPackages
++ intakeCfg.users.${userName}.extraPackages;
};
in mkMerge (map addPackagesToUser enabledUserNames);
# Enable cron # Assign each user an internal port for their personal intake instance
services.cron.enable = true; enabledUsers = filterAttrs (userName: userCfg: userCfg.enable) intakeCfg.users;
enabledUserNames = mapAttrsToList (userName: userCfg: userName) enabledUsers;
userPortList = imap1 (i: userName: {
${userName} = i + intakeCfg.internalPortStart;
}) enabledUserNames;
userPort = foldl (acc: val: acc // val) { } userPortList;
# Define a user service for each configured user # To avoid polluting PATH with httpd programs, define an htpasswd wrapper
systemd.services = htpasswdWrapper = pkgs.writeShellScriptBin "htpasswd" ''
let ${pkgs.apacheHttpd}/bin/htpasswd $@
runScript = userName: pkgs.writeShellScript "intake-run.sh" ''
# Add the setuid wrapper directory so `crontab` is accessible
export PATH="${config.security.wrapperDir}:$PATH"
${pythonEnv}/bin/intake run -d /home/${userName}/.local/share/intake --port ${toString userPort.${userName}}
''; '';
# systemd service definition for a single user, given `services.intake.users.userName` = `userCfg`
userServiceConfig = userName: userCfg: {
"intake@${userName}" = {
description = "Intake service for user ${userName}";
script = "${runScript userName}";
path = intakeCfg.extraPackages ++ userCfg.extraPackages;
serviceConfig = {
User = userName;
Type = "simple";
};
wantedBy = [ "multi-user.target" ];
after = [ "network.target" ];
enable = userCfg.enable;
};
};
in mkMerge (mapAttrsToList userServiceConfig intakeCfg.users);
# Define an nginx reverse proxy to request auth # File locations
services.nginx = mkIf (enabledUsers != {}) { intakeDir = "/etc/intake";
enable = true; intakePwd = "${intakeDir}/htpasswd";
virtualHosts."intake" = mkIf (enabledUsers != {}) { in
listen = [ intakeCfg.listen ]; {
locations."/" = { # Apply the overlay so intake is included in pkgs.
proxyPass = "http://127.0.0.1:$target_port"; nixpkgs.overlays = [ flake.overlays.default ];
basicAuthFile = intakePwd;
# Define a user group for access to the htpasswd file. nginx needs to be able to read it.
users.groups.intake.members = mkIf (enabledUsers != { }) (enabledUserNames ++ [ "nginx" ]);
# Define an activation script that ensures that the htpasswd file exists.
system.activationScripts.etc-intake = ''
if [ ! -e ${intakeDir} ]; then
${pkgs.coreutils}/bin/mkdir -p ${intakeDir};
fi
${pkgs.coreutils}/bin/chown root:root ${intakeDir}
${pkgs.coreutils}/bin/chmod 755 ${intakeDir}
if [ ! -e ${intakePwd} ]; then
${pkgs.coreutils}/bin/touch ${intakePwd}
fi
${pkgs.coreutils}/bin/chown root:intake ${intakePwd}
${pkgs.coreutils}/bin/chmod 660 ${intakePwd}
'';
# Give every intake user the htpasswd wrapper, the shared packages, and the user-specific packages.
users.users =
let
addPackagesToUser = userName: {
${userName}.packages = [
htpasswdWrapper
intake
] ++ intakeCfg.extraPackages ++ intakeCfg.users.${userName}.extraPackages;
};
in
mkMerge (map addPackagesToUser enabledUserNames);
# Enable cron
services.cron.enable = true;
# Define a user service for each configured user
systemd.services =
let
runScript =
userName:
pkgs.writeShellScript "intake-run.sh" ''
# Add the setuid wrapper directory so `crontab` is accessible
export PATH="${config.security.wrapperDir}:$PATH"
${pythonEnv}/bin/intake run -d /home/${userName}/.local/share/intake --port ${toString userPort.${userName}}
'';
# systemd service definition for a single user, given `services.intake.users.userName` = `userCfg`
userServiceConfig = userName: userCfg: {
"intake@${userName}" = {
description = "Intake service for user ${userName}";
script = "${runScript userName}";
path = intakeCfg.extraPackages ++ userCfg.extraPackages;
serviceConfig = {
User = userName;
Type = "simple";
};
wantedBy = [ "multi-user.target" ];
after = [ "network.target" ];
enable = userCfg.enable;
};
};
in
mkMerge (mapAttrsToList userServiceConfig intakeCfg.users);
# Define an nginx reverse proxy to request auth
services.nginx = mkIf (enabledUsers != { }) {
enable = true;
virtualHosts."intake" = mkIf (enabledUsers != { }) {
listen = [ intakeCfg.listen ];
locations."/" = {
proxyPass = "http://127.0.0.1:$target_port";
basicAuthFile = intakePwd;
};
extraConfig = foldl (acc: val: acc + val) "" (
mapAttrsToList (userName: port: ''
if ($remote_user = "${userName}") {
set $target_port ${toString port};
}
'') userPort
);
}; };
extraConfig = foldl (acc: val: acc + val) "" (mapAttrsToList (userName: port: ''
if ($remote_user = "${userName}") {
set $target_port ${toString port};
}
'') userPort);
}; };
}; };
};
} }

View File

@ -1,6 +1,6 @@
[project] [project]
name = "intake" name = "intake"
version = "1.1.0" version = "1.3.1"
[project.scripts] [project.scripts]
intake = "intake.cli:main" intake = "intake.cli:main"

View File

@ -1,10 +1,9 @@
(import (import (
( let
let lock = builtins.fromJSON (builtins.readFile ./flake.lock); in lock = builtins.fromJSON (builtins.readFile ./flake.lock);
fetchTarball { in
url = "https://github.com/edolstra/flake-compat/archive/${lock.nodes.flake-compat.locked.rev}.tar.gz"; fetchTarball {
sha256 = lock.nodes.flake-compat.locked.narHash; url = "https://github.com/edolstra/flake-compat/archive/${lock.nodes.flake-compat.locked.rev}.tar.gz";
} sha256 = lock.nodes.flake-compat.locked.narHash;
) }
{ src = ./.; } ) { src = ./.; }).shellNix
).shellNix

View File

@ -2,6 +2,7 @@
"demo": [ "demo": [
"demo_basic_callback", "demo_basic_callback",
"demo_logging", "demo_logging",
"demo_raw_sh" "demo_raw_sh",
"demo_oncreate"
] ]
} }

View File

@ -1,5 +1,5 @@
from pathlib import Path from pathlib import Path
from typing import List, Callable from typing import Generator, Callable
import pytest import pytest
@ -14,15 +14,16 @@ def clean_source(source_path: Path):
@pytest.fixture @pytest.fixture
def using_source() -> Callable: def using_source() -> Generator[Callable, None, LocalSource]:
test_data = Path(__file__).parent test_data = Path(__file__).parent
sources: List[Path] = [] sources: list[Path] = []
def _using_source(name: str): def _using_source(name: str):
source_path = test_data / name source_path = test_data / name
clean_source(source_path) clean_source(source_path)
sources.append(source_path) sources.append(source_path)
return LocalSource(test_data, name) return LocalSource(test_data, name)
yield _using_source yield _using_source
for source_path in sources: for source_path in sources:

View File

@ -1,7 +1,9 @@
{ {
"action": { "action": {
"fetch": { "fetch": {
"exe": "true" "args": [
"true"
]
} }
} }
} }

View File

@ -1,20 +1,20 @@
{ {
"action": { "action": {
"fetch": { "fetch": {
"exe": "./increment.py",
"args": [ "args": [
"./increment.py",
"fetch" "fetch"
] ]
}, },
"increment": { "increment": {
"exe": "./increment.py",
"args": [ "args": [
"./increment.py",
"increment" "increment"
] ]
}, },
"decrement": { "decrement": {
"exe": "./increment.py",
"args": [ "args": [
"./increment.py",
"decrement" "decrement"
] ]
} }

View File

@ -1,8 +1,8 @@
{ {
"action": { "action": {
"fetch": { "fetch": {
"exe": "python3",
"args": [ "args": [
"python3",
"update.py" "update.py"
] ]
} }

View File

@ -0,0 +1,22 @@
{
"action": {
"fetch": {
"args": [
"./update.py",
"fetch"
]
},
"update": {
"args": [
"./update.py",
"update"
]
},
"on_create": {
"args": [
"./update.py",
"update"
]
}
}
}

29
tests/demo_oncreate/update.py Executable file
View File

@ -0,0 +1,29 @@
#!/usr/bin/env python3
import argparse, json, sys, time
parser = argparse.ArgumentParser()
parser.add_argument("action")
args = parser.parse_args()
print("args:", args, file=sys.stderr, flush=True)
if args.action == "fetch":
print(
json.dumps(
{
"id": str(int(time.time())),
"title": "Title has not been updated",
"action": {
"update": 0,
},
}
)
)
if args.action == "update" or args.action == "on_create":
item = sys.stdin.readline()
item = json.loads(item)
item["action"]["update"] += 1
item["title"] = f"Updated {item['action']['update']} times"
print(json.dumps(item))

View File

@ -1,8 +1,8 @@
{ {
"action": { "action": {
"fetch": { "fetch": {
"exe": "sh",
"args": [ "args": [
"sh",
"-c", "-c",
"echo {\\\"id\\\": \\\"$(date +%Y-%m-%d-%H-%M)\\\"}" "echo {\\\"id\\\": \\\"$(date +%Y-%m-%d-%H-%M)\\\"}"
] ]

View File

@ -1,8 +1,10 @@
{ {
"action": { "action": {
"fetch": { "fetch": {
"exe": "./update.py", "args": [
"args": ["fetch"] "./update.py",
"fetch"
]
} }
} }
} }

View File

@ -1,4 +1,6 @@
import json import json
from pathlib import Path
import tempfile
from intake.source import fetch_items, update_items, LocalSource from intake.source import fetch_items, update_items, LocalSource
@ -8,6 +10,7 @@ def test_default_source(using_source):
fetch = fetch_items(source) fetch = fetch_items(source)
assert len(fetch) == 0 assert len(fetch) == 0
def test_basic_lifecycle(using_source): def test_basic_lifecycle(using_source):
source: LocalSource = using_source("test_inbox") source: LocalSource = using_source("test_inbox")
state = {"inbox": [{"id": "first"}]} state = {"inbox": [{"id": "first"}]}
@ -61,3 +64,44 @@ def test_basic_lifecycle(using_source):
items = list(source.get_all_items()) items = list(source.get_all_items())
assert len(items) == 1 assert len(items) == 1
assert items[0]["id"] == "second" assert items[0]["id"] == "second"
def test_batch():
with tempfile.TemporaryDirectory() as data_dir:
root = Path(data_dir)
source_dir = root / "batching"
source_dir.mkdir()
config_file = source_dir / "intake.json"
sh_args = [
"python",
"-c",
"import random; print(f'{{\"id\":\"{random.randrange(16**16):016x}\"}}')"
]
batch_config = {
"action": {
"fetch": {
"args": sh_args
}
},
"batch": 0
}
config_file.write_text(json.dumps(batch_config))
source = LocalSource(root, source_dir.name)
# batch sets the tts
fetch1 = fetch_items(source)
assert len(fetch1) == 1
update_items(source, fetch1)
item1 = source.get_item(fetch1[0]["id"])
assert "tts" in item1._item
batch_config["batch"] = 3600
config_file.write_text(json.dumps(batch_config))
fetch2 = fetch_items(source)
assert len(fetch2) == 1
update_items(source, fetch2)
item2 = source.get_item(fetch2[0]["id"])
assert "tts" in item2._item
assert item1["id"] != item2["id"]
assert item2.tts_at == item1.tts_at + 3600