Compare commits

...

22 Commits

Author SHA1 Message Date
bbe3f53cd4 Bump version to 1.3.1 2024-11-06 21:29:44 -08:00
718220f889 Fix crontab on systems with crontab 2024-11-06 21:27:48 -08:00
8337b74a80 Bump version to 1.3.0 2024-11-06 20:50:27 -08:00
527e37003c Update tag title text 2024-11-06 20:49:54 -08:00
ec33495c56 Add source-level item batching 2024-11-06 20:49:32 -08:00
d0780a9fd1 Update type annotations 2024-11-06 19:16:21 -08:00
c903947f00 Expose pythonEnv from flake 2024-11-06 19:03:19 -08:00
e72f9aa349 Consolidate intake.core into intake.source
No point having a submodule for one function
2024-11-06 18:53:35 -08:00
381de535f7 Fold action.exe into action.args 2024-11-06 18:39:44 -08:00
9bb331941f Fix crontab on systems without crontab 2024-11-06 18:32:11 -08:00
256a52e902 Provide link back to source in fetch result 2024-09-21 15:07:05 -07:00
5cf7e8ef48 Update docs and version 2024-09-21 14:58:21 -07:00
008c0c8230 Add make lint 2024-09-21 14:58:09 -07:00
0c4177b783 Add on_create trigger 2024-09-21 14:58:00 -07:00
c21f55da8d Add script to reset test sources 2024-09-21 14:41:57 -07:00
4abb281715 Include item titles in fetch response 2024-09-21 14:33:38 -07:00
92a10c5ca8 Add a fetch button to source list 2024-09-21 14:13:00 -07:00
4947ce56c6 Change source list to a table 2024-09-21 13:28:16 -07:00
40ad075e96 Add unread count to channel links 2024-09-21 13:13:17 -07:00
887a75bb29 Add Makefile
I don't remember if it's normal for `pytest` to fail to find the module and `python -m pytest` to succeed, but it seems like something to write down, and a Makefile is the place
2024-09-21 12:54:36 -07:00
a5754e7023 Apply nixfmt 2024-09-21 12:51:24 -07:00
75c13a21dc Add nixfmt 2024-09-21 12:48:24 -07:00
26 changed files with 553 additions and 281 deletions

4
.gitignore vendored
View File

@ -159,6 +159,8 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
venv
# nixos-shell
nixos.qcow2
@ -166,4 +168,4 @@ nixos.qcow2
result
# test sources
tests/**/*.item
tests/**/*.item

18
Makefile Normal file
View File

@ -0,0 +1,18 @@
.PHONY: *
help: ## display this help
@awk 'BEGIN{FS = ":.*##"; printf "\033[1m\nUsage\n \033[1;92m make\033[0;36m <target>\033[0m\n"} /^[a-zA-Z0-9_-]+:.*?##/ { printf " \033[36m%-15s\033[0m %s\n", $$1, $$2 } ' $(MAKEFILE_LIST)
venv: ## link the python dev environment to ./venv
nix build .#pythonEnv -o venv
lint: ## run linters
nix fmt
black **/*.py
test: ## run pytest
python -m pytest
reset: ## delete all items in tests/ and update each source once
rm -v tests/*/*.item
find tests -name intake.json -printf "%h\n" | cut -c7- | xargs -n 1 python -m intake update -d tests -s

View File

@ -31,32 +31,35 @@ intake
{
"action": {
"fetch": {
"exe": "<absolute path to program or name on intake's PATH>",
"args": ["list", "of", "program", "arguments"]
"args": ["program name", "and", "list", "of", "arguments"]
},
"<action name>": {
"exe": "...",
"args": "..."
}
},
"env": {
"...": "..."
},
"cron": "* * * * *"
"cron": "* * * * *",
"batch": "<number>"
}
```
Each key under `action` defines an action that can be taken for the source. An action must contain `exe` and may contain `args`. A source must have a `fetch` action.
Each key under `action` defines an action that can be taken for the source. A source must at least have a `fetch` action. If an action named `on_create` is defined for the source, it is executed once for an item when that item is created, that is, the first time the item is returned from the source.
Each key under `env` defines an environment variable that will be set when actions are executed.
Each key under `env` defines an environment variable that will be set when `fetch` or other actions are executed.
If `cron` is present, it must define a crontab schedule. Intake will automatically create crontab entries to update each source according to its cron schedule.
`batch` may be a number or string of a number. If it is present, items created by the source will be batched via `tts` so that all items created in a single 24-hour window become visible at the same time. Items created with a longer `tts` will keep their `tts`.
The batch window is computed from midnight to midnight UTC, offset by the value of `batch` (in seconds).
## Interface for source programs
Intake interacts with sources by executing the actions defined in the source's `intake.json`. The `fetch` action is required and used to check for new feed items when `intake update` is executed.
To execute an action, intake executes the `exe` program for the action with the corresponding `args` (if present) as arguments. The process's working directory is set to the source's folder, i.e. the folder containing `intake.json`. The process's environment is as follows:
To execute an action, intake executes the command given by `args`. The process's working directory is set to the source's folder, i.e. the folder containing `intake.json`. The process's environment is as follows:
* intake's environment is inherited.
* `STATE_PATH` is set to the absolute path of `state`.

View File

@ -1,10 +1,9 @@
(import
(
let lock = builtins.fromJSON (builtins.readFile ./flake.lock); in
fetchTarball {
url = "https://github.com/edolstra/flake-compat/archive/${lock.nodes.flake-compat.locked.rev}.tar.gz";
sha256 = lock.nodes.flake-compat.locked.narHash;
}
)
{ src = ./.; }
).defaultNix
(import (
let
lock = builtins.fromJSON (builtins.readFile ./flake.lock);
in
fetchTarball {
url = "https://github.com/edolstra/flake-compat/archive/${lock.nodes.flake-compat.locked.rev}.tar.gz";
sha256 = lock.nodes.flake-compat.locked.narHash;
}
) { src = ./.; }).defaultNix

View File

@ -14,7 +14,7 @@
users.users.bob = {
isNormalUser = true;
password = "beta";
uid = 1001;
uid = 1001;
packages = [ pkgs.intake ];
};
@ -26,11 +26,13 @@
};
# Expose the vm's intake revproxy at host port 5234
virtualisation.forwardPorts = [{
from = "host";
host.port = 5234;
guest.port = 8080;
}];
virtualisation.forwardPorts = [
{
from = "host";
host.port = 5234;
guest.port = 8080;
}
];
# Mount the demo content for both users
nixos-shell.mounts = {
@ -47,20 +49,20 @@
# Create an activation script that copies and chowns the demo content
# chmod 777 because the users may not exist when the activation script runs
system.activationScripts =
let
userSetup = name: uid: ''
${pkgs.coreutils}/bin/mkdir -p /home/${name}/.local/share/intake
${pkgs.coreutils}/bin/cp -r /mnt/${name}/* /home/${name}/.local/share/intake/
${pkgs.coreutils}/bin/chown -R ${uid} /home/${name}
${pkgs.findutils}/bin/find /home/${name} -type d -exec ${pkgs.coreutils}/bin/chmod 755 {} \;
${pkgs.findutils}/bin/find /home/${name} -type f -exec ${pkgs.coreutils}/bin/chmod 644 {} \;
'';
in
{
aliceSetup = userSetup "alice" "1000";
bobSetup = userSetup "bob" "1001";
};
system.activationScripts =
let
userSetup = name: uid: ''
${pkgs.coreutils}/bin/mkdir -p /home/${name}/.local/share/intake
${pkgs.coreutils}/bin/cp -r /mnt/${name}/* /home/${name}/.local/share/intake/
${pkgs.coreutils}/bin/chown -R ${uid} /home/${name}
${pkgs.findutils}/bin/find /home/${name} -type d -exec ${pkgs.coreutils}/bin/chmod 755 {} \;
${pkgs.findutils}/bin/find /home/${name} -type f -exec ${pkgs.coreutils}/bin/chmod 644 {} \;
'';
in
{
aliceSetup = userSetup "alice" "1000";
bobSetup = userSetup "bob" "1001";
};
# Put the demo sources on the global PATH
environment.variables.PATH = "/mnt/sources";

135
flake.nix
View File

@ -13,61 +13,90 @@
nixos-shell.inputs.nixpkgs.follows = "nixpkgs";
};
outputs = { self, nixpkgs, flake-compat, nixos-shell }:
let
inherit (nixpkgs.lib) makeOverridable nixosSystem;
system = "x86_64-linux";
in {
packages.${system} = let
pkgs = (import nixpkgs {
outputs =
{
self,
nixpkgs,
flake-compat,
nixos-shell,
}:
let
inherit (nixpkgs.lib) makeOverridable nixosSystem;
system = "x86_64-linux";
pkgs = nixpkgs.legacyPackages.${system};
pythonEnv = pkgs.python3.withPackages (
pypkgs: with pypkgs; [
flask
black
pytest
]
);
in
{
formatter.${system} = nixpkgs.legacyPackages.${system}.nixfmt-rfc-style;
packages.${system} =
let
pkgs = (
import nixpkgs {
inherit system;
overlays = [ self.overlays.default ];
}
);
in
{
default = self.packages.${system}.intake;
inherit (pkgs) intake;
inherit pythonEnv;
};
devShells.${system} = {
default = pkgs.mkShell {
packages = [
pythonEnv
pkgs.gnumake
pkgs.nixos-shell
# We only take this dependency for htpasswd, which is a little unfortunate
pkgs.apacheHttpd
];
shellHook = ''
PS1="(develop) $PS1"
'';
};
};
overlays.default = final: prev: {
intake = final.python3Packages.buildPythonPackage {
name = "intake";
src = builtins.path {
path = ./.;
name = "intake";
};
format = "pyproject";
propagatedBuildInputs = with final.python3Packages; [
flask
setuptools
];
};
};
templates.source = {
path = builtins.path {
path = ./template;
name = "source";
};
description = "A basic intake source config";
};
nixosModules.default = import ./module.nix self;
nixosConfigurations."demo" = makeOverridable nixosSystem {
inherit system;
overlays = [ self.overlays.default ];
});
in {
default = self.packages.${system}.intake;
inherit (pkgs) intake;
};
devShells.${system} = {
default = let
pkgs = nixpkgs.legacyPackages.${system};
pythonEnv = pkgs.python3.withPackages (pypkgs: with pypkgs; [ flask black pytest ]);
in pkgs.mkShell {
packages = [
pythonEnv
pkgs.nixos-shell
# We only take this dependency for htpasswd, which is a little unfortunate
pkgs.apacheHttpd
modules = [
nixos-shell.nixosModules.nixos-shell
self.nixosModules.default
./demo
];
shellHook = ''
PS1="(develop) $PS1"
'';
};
};
overlays.default = final: prev: {
intake = final.python3Packages.buildPythonPackage {
name = "intake";
src = builtins.path { path = ./.; name = "intake"; };
format = "pyproject";
propagatedBuildInputs = with final.python3Packages; [ flask setuptools ];
};
};
templates.source = {
path = builtins.path { path = ./template; name = "source"; };
description = "A basic intake source config";
};
nixosModules.default = import ./module.nix self;
nixosConfigurations."demo" = makeOverridable nixosSystem {
inherit system;
modules = [
nixos-shell.nixosModules.nixos-shell
self.nixosModules.default
./demo
];
};
};
}

View File

@ -18,9 +18,16 @@ from flask import (
current_app,
)
from intake.core import intake_data_dir
from intake.crontab import update_crontab_entries
from intake.source import LocalSource, execute_action, Item
from intake.source import (
LocalSource,
execute_action,
Item,
fetch_items,
update_items,
intake_data_dir,
)
from intake.types import InvalidConfigException, SourceUpdateException
# Globals
app = Flask(__name__)
@ -115,10 +122,23 @@ def root():
if channels_config_path.exists():
channels = json.loads(channels_config_path.read_text(encoding="utf8"))
channel_counts = {
channel: len(
[
item
for source in sources
for item in LocalSource(data_path, source).get_all_items()
if not item.is_hidden
]
)
for channel, sources in channels.items()
}
return render_template(
"home.jinja2",
sources=sources,
channels=channels,
channel_counts=channel_counts,
)
@ -306,13 +326,15 @@ def _parse_source_config(config_str: str):
if "fetch" not in action:
return ("No fetch action defined", {})
fetch = action["fetch"]
if "exe" not in fetch:
if "exe" not in fetch and not fetch.get("args"):
return ("No fetch exe", {})
config = {"action": parsed["action"]}
if "env" in parsed:
config["env"] = parsed["env"]
if "cron" in parsed:
config["cron"] = parsed["cron"]
if "batch" in parsed:
config["batch"] = parsed["batch"]
return (None, config)
@ -368,6 +390,24 @@ def _parse_channels_config(config_str: str):
return (None, parsed)
@app.post("/fetch/<string:source_name>")
@auth_check
def fetch(source_name: str):
data_path: Path = current_app.config["INTAKE_DATA"]
source = LocalSource(data_path, source_name)
try:
items = fetch_items(source)
update_items(source, items)
titles = "\n".join(f"<li>{item.display_title}</li>" for item in items)
source_url = url_for("source_feed", name=source_name)
return f'Update returned {len(items)} items:<ul>{titles}</ul><p><a href="{source_url}">{source_name}</a></p>'
except InvalidConfigException as ex:
abort(500, f"Could not fetch {source_name}:\n{ex}")
except SourceUpdateException as ex:
abort(500, f"Error updating source {source_name}:\n{ex}")
@app.post("/add")
@auth_check
def add_item():
@ -379,7 +419,7 @@ def add_item():
config_path = source_path / "intake.json"
if not config_path.exists():
config_path.write_text(
json.dumps({"action": {"fetch": {"exe": "true"}}}, indent=2)
json.dumps({"action": {"fetch": {"args": ["true"]}}}, indent=2)
)
source = LocalSource(source_path.parent, source_path.name)

View File

@ -10,9 +10,14 @@ import pwd
import subprocess
import sys
from intake.core import intake_data_dir
from intake.crontab import update_crontab_entries
from intake.source import fetch_items, LocalSource, update_items, execute_action
from intake.source import (
fetch_items,
LocalSource,
update_items,
execute_action,
intake_data_dir,
)
from intake.types import InvalidConfigException, SourceUpdateException
@ -52,7 +57,6 @@ def cmd_edit(cmd_args):
{
"action": {
"fetch": {
"exe": "",
"args": [],
},
},

View File

@ -1,12 +0,0 @@
from pathlib import Path
import os
def intake_data_dir() -> Path:
if intake_data := os.environ.get("INTAKE_DATA"):
return Path(intake_data)
if xdg_data_home := os.environ.get("XDG_DATA_HOME"):
return Path(xdg_data_home) / "intake"
if home := os.environ.get("HOME"):
return Path(home) / ".local" / "share" / "intake"
raise Exception("No intake data directory defined")

View File

@ -1,5 +1,6 @@
from pathlib import Path
import os
import shutil
import subprocess
import sys
@ -28,15 +29,13 @@ def update_crontab_entries(data_path: Path):
Update the intake-managed section of the user's crontab.
"""
# If there is no crontab command available, quit early.
cmd = ("command", "-v", "crontab")
print("Executing", *cmd, file=sys.stderr)
crontab_exists = subprocess.run(cmd, shell=True)
if crontab_exists.returncode:
print("Could not update crontab", file=sys.stderr)
crontab = shutil.which("crontab")
if not crontab:
print("No crontab found, skipping", file=sys.stderr)
return
# Get the current crontab
cmd = ["crontab", "-e"]
cmd = [crontab, "-e"]
print("Executing", *cmd, file=sys.stderr)
get_crontab = subprocess.run(
cmd,
@ -77,7 +76,7 @@ def update_crontab_entries(data_path: Path):
print("Updating", len(new_crontab_lines) - 2, "crontab entries", file=sys.stderr)
# Save the updated crontab
cmd = ["crontab", "-"]
cmd = [crontab, "-"]
print("Executing", *cmd, file=sys.stderr)
new_crontab: bytes = "\n".join(new_crontab_lines).encode("utf8")
save_crontab = subprocess.Popen(

View File

@ -3,7 +3,7 @@ from pathlib import Path
from subprocess import Popen, PIPE, TimeoutExpired
from threading import Thread
from time import time as current_time
from typing import List
from typing import Generator
import json
import os
import os.path
@ -12,6 +12,16 @@ import sys
from intake.types import InvalidConfigException, SourceUpdateException
def intake_data_dir() -> Path:
if intake_data := os.environ.get("INTAKE_DATA"):
return Path(intake_data)
if xdg_data_home := os.environ.get("XDG_DATA_HOME"):
return Path(xdg_data_home) / "intake"
if home := os.environ.get("HOME"):
return Path(home) / ".local" / "share" / "intake"
raise Exception("No intake data directory defined")
class Item:
"""
A wrapper for an item object.
@ -71,26 +81,31 @@ class Item:
# The time-to-live fields protects an item from removal until expiry.
# This is mainly used to avoid old items resurfacing when their source
# cannot guarantee monotonocity.
if "ttl" in self._item:
ttl_date = self._item["created"] + self._item["ttl"]
if ttl_date > current_time():
return False
if "ttl" in self._item and self.ttl_at > current_time():
return False
# The time-to-die field puts a maximum lifespan on an item, removing it
# even if it is active.
if "ttd" in self._item:
ttd_date = self._item["created"] + self._item["ttd"]
if ttd_date < current_time():
return True
if "ttd" in self._item and self.ttd_at < current_time():
return True
return not self._item["active"]
@property
def tts_at(self):
return self._item["created"] + self._item.get("tts", 0)
@property
def ttl_at(self):
return self._item["created"] + self._item.get("ttl", 0)
@property
def ttd_at(self):
return self._item["created"] + self._item.get("ttd", 0)
@property
def before_tts(self):
return (
"tts" in self._item
and current_time() < self._item["created"] + self._item["tts"]
)
return "tts" in self._item and current_time() < self.tts_at
@property
def is_hidden(self):
@ -158,7 +173,7 @@ class LocalSource:
def get_item_path(self, item_id: dict) -> Path:
return self.source_path / f"{item_id}.item"
def get_item_ids(self) -> List[str]:
def get_item_ids(self) -> list[str]:
return [
filepath.name[:-5]
for filepath in self.source_path.iterdir()
@ -183,7 +198,7 @@ class LocalSource:
def delete_item(self, item_id) -> None:
os.remove(self.get_item_path(item_id))
def get_all_items(self) -> List[Item]:
def get_all_items(self) -> Generator[Item, None, None]:
for filepath in self.source_path.iterdir():
if filepath.name.endswith(".item"):
yield Item(self, json.loads(filepath.read_text(encoding="utf8")))
@ -218,7 +233,7 @@ def _read_stderr(process: Popen) -> None:
def _execute_source_action(
source: LocalSource, action: str, input: str, timeout: timedelta
) -> List[str]:
) -> list[str]:
"""
Execute the action from a given source. If stdin is specified, pass it
along to the process. Returns lines from stdout.
@ -229,10 +244,13 @@ def _execute_source_action(
if not action_cfg:
raise InvalidConfigException(f"No such action {action}")
if "exe" not in action_cfg:
exe = [action_cfg["exe"]] if "exe" in action_cfg else []
command = exe + action_cfg.get("args", [])
if not command:
raise InvalidConfigException(f"No exe for action {action}")
command = [action_cfg["exe"], *action_cfg.get("args", [])]
config_env = {key: str(value) for key, value in config.get("env", {}).items()}
env = {
**os.environ.copy(),
@ -283,13 +301,13 @@ def _execute_source_action(
return output
def fetch_items(source: LocalSource, timeout: int = 60) -> List[Item]:
def fetch_items(source: LocalSource, timeout: int = 60) -> list[Item]:
"""
Execute the feed source and return the current feed items.
Returns a list of feed items on success.
Throws SourceUpdateException if the feed source update failed.
"""
items: List[Item] = []
items: list[Item] = []
output = _execute_source_action(source, "fetch", None, timedelta(timeout))
@ -325,29 +343,54 @@ def execute_action(
raise SourceUpdateException("invalid json")
def update_items(source: LocalSource, fetched_items: List[Item]):
def update_items(source: LocalSource, fetched_items: list[Item]):
"""
Update the source with a batch of new items, doing creations, updates, and
deletions as necessary.
"""
config = source.get_config()
# Get a list of item ids that already existed for this source.
prior_ids = source.get_item_ids()
print(f"Found {len(prior_ids)} prior items", file=sys.stderr)
# Determine which items are new and which are updates.
new_items: List[Item] = []
upd_items: List[Item] = []
new_items: list[Item] = []
upd_items: list[Item] = []
for item in fetched_items:
if source.item_exists(item["id"]):
upd_items.append(item)
else:
new_items.append(item)
# If the source is batched, set the tts on new items to at least the batch tts
if "batch" in config:
try:
batch_adj = int(config["batch"])
now = current_time() - batch_adj
batch_start = now - (now % 86400)
batch_end = batch_start + 86400 + batch_adj
for item in new_items:
min_tts = batch_end - item["created"]
item["tts"] = min(min_tts, item.get("tts", min_tts))
except:
pass
# Write all the new items to the source directory.
for item in new_items:
# TODO: support on-create trigger
source.save_item(item)
# If the source has an on-create trigger, run it for new items
if "on_create" in config.get("action", {}):
for item in new_items:
try:
execute_action(source, item["id"], "on_create")
except Exception as ex:
print(
f"on_create failed for {source.source_name}/{item['id']}:\n{ex}",
file=sys.stderr,
)
# Update the other items using the fetched items' values.
for upd_item in upd_items:
old_item = source.get_item(upd_item["id"])

View File

@ -174,11 +174,13 @@ var doAction = function (source, itemid, action) {
{# source/id/created footer line #}
{% if item.source or item.id or item.created %}
<span class="item-info" title="{{ 'Tags: {}'.format(', '.join(item.tags)) }}">
<span class="item-info" title="{% if item.tags %}{{ 'Tags: {}'.format(', '.join(item.tags)) }}{% else %}No tags{% endif %}">
{% if item.source %}{{item.source}}{% endif %}
{% if item.id %}{{item.id}}{% endif %}
{% if item.created %}{{item.created|datetimeformat}}{% endif %}
{% if item.ttl %}L{% endif %}{% if item.ttd %}D{% endif %}{% if item.tts %}S{% endif %}
{% if item.ttl %}<span title="TTL {{item.ttl_at|datetimeformat}}">[L]</span>{% endif %}
{% if item.ttd %}<span title="TTD {{item.ttd_at|datetimeformat}}">[D]</span>{% endif %}
{% if item.tts %}<span title="TTS {{item.tts_at|datetimeformat}}">[S]</span>{% endif %}
</span>
{% endif %}

View File

@ -34,6 +34,12 @@ summary:focus {
width: 100%;
resize: vertical;
}
.intake-sources td {
padding-block: 0.4em;
}
.intake-sources form {
margin: 0
}
</style>
</head>
<body>
@ -46,7 +52,13 @@ summary:focus {
<p>No channels found.</p>
{% else %}
{% for channel in channels %}
<p><a href="{{ url_for('channel_feed', name=channel) }}">{{ channel }}</a></p>
<p><a href="{{ url_for('channel_feed', name=channel) }}">
{%- if channel_counts[channel] -%}
({{ channel_counts[channel] }}) {{ channel }}
{%- else -%}
{{ channel }}
{%- endif -%}
</a></p>
{% endfor %}
{% endif %}
<p><a href="{{ url_for('channels_edit') }}">Edit channels</a></p>
@ -59,17 +71,22 @@ summary:focus {
{% if not sources %}
<p>No sources found.</p>
{% else %}
<table class="intake-sources">
{% for source in sources %}
<p>
<tr>
<td>
{%- for channel, srcs in channels|items -%}
{%- if source.source_name in srcs -%}
^
{%- endif -%}
{%- endfor -%}
<a href="{{ url_for('source_feed', name=source.source_name) }}">{{ source.source_name|safe }}</a>
(<a href="{{ url_for('source_edit', name=source.source_name) }}">edit</a>)
</p>
</td>
<td><form id="{{ source.source_name }}"><button type="submit" formmethod=post formaction="{{ url_for('fetch', source_name=source.source_name) }}" />fetch</button></form></td>
<td>(<a href="{{ url_for('source_edit', name=source.source_name) }}">edit</a>)</td>
<td><a href="{{ url_for('source_feed', name=source.source_name) }}">{{ source.source_name|safe }}</a></td>
</tr>
{% endfor %}
</table>
{% endif %}
</details>
</article>

View File

@ -1,153 +1,179 @@
flake: { config, lib, pkgs, ... }:
flake:
{
config,
lib,
pkgs,
...
}:
let
inherit (lib) filterAttrs foldl imap1 mapAttrsToList mkEnableOption mkIf mkMerge mkOption mkPackageOption types;
inherit (lib)
filterAttrs
foldl
imap1
mapAttrsToList
mkEnableOption
mkIf
mkMerge
mkOption
mkPackageOption
types
;
intakeCfg = config.services.intake;
in {
in
{
options = {
services.intake = {
listen.addr = mkOption {
type = types.str;
default = "0.0.0.0";
description = "The listen address for the entry point to intake services. This endpoint will redirect to a "
"local port based on the request's HTTP Basic Auth credentials.";
description = "The listen address for the entry point to intake services. This endpoint will redirect to a local port based on the request's HTTP Basic Auth credentials.";
};
listen.port = mkOption {
type = types.port;
default = 80;
description = "The listen port for the entry point to intake services. This endpoint will redirect to a local "
"port based on the request's HTTP Basic Auth credentials.";
description = "The listen port for the entry point to intake services. This endpoint will redirect to a local port based on the request's HTTP Basic Auth credentials.";
};
package = mkPackageOption pkgs "intake" {};
package = mkPackageOption pkgs "intake" { };
internalPortStart = mkOption {
type = types.port;
default = 24130;
description = "The first port to use for internal service endpoints. A number of ports will be continguously "
"allocated equal to the number of users with enabled intake services.";
description = "The first port to use for internal service endpoints. A number of ports will be continguously allocated equal to the number of users with enabled intake services.";
};
extraPackages = mkOption {
type = types.listOf types.package;
default = [];
default = [ ];
description = "Extra packages available to all enabled users and their intake services.";
};
users = mkOption {
description = "User intake service definitions.";
default = {};
type = types.attrsOf (types.submodule {
options = {
enable = mkEnableOption "intake, a personal feed aggregator.";
default = { };
type = types.attrsOf (
types.submodule {
options = {
enable = mkEnableOption "intake, a personal feed aggregator.";
extraPackages = mkOption {
type = types.listOf types.package;
default = [];
description = "Extra packages available to this user and their intake service.";
extraPackages = mkOption {
type = types.listOf types.package;
default = [ ];
description = "Extra packages available to this user and their intake service.";
};
};
};
});
}
);
};
};
};
config =
let
# Define the intake package and a python environment to run it from
intake = intakeCfg.package;
pythonEnv = pkgs.python3.withPackages (pypkgs: [ intake ]);
# Assign each user an internal port for their personal intake instance
enabledUsers = filterAttrs (userName: userCfg: userCfg.enable) intakeCfg.users;
enabledUserNames = mapAttrsToList (userName: userCfg: userName) enabledUsers;
userPortList = imap1 (i: userName: { ${userName} = i + intakeCfg.internalPortStart; }) enabledUserNames;
userPort = foldl (acc: val: acc // val) {} userPortList;
# To avoid polluting PATH with httpd programs, define an htpasswd wrapper
htpasswdWrapper = pkgs.writeShellScriptBin "htpasswd" ''
${pkgs.apacheHttpd}/bin/htpasswd $@
'';
# File locations
intakeDir = "/etc/intake";
intakePwd = "${intakeDir}/htpasswd";
in {
# Apply the overlay so intake is included in pkgs.
nixpkgs.overlays = [ flake.overlays.default ];
# Define a user group for access to the htpasswd file. nginx needs to be able to read it.
users.groups.intake.members = mkIf (enabledUsers != {}) (enabledUserNames ++ [ "nginx" ]);
# Define an activation script that ensures that the htpasswd file exists.
system.activationScripts.etc-intake = ''
if [ ! -e ${intakeDir} ]; then
${pkgs.coreutils}/bin/mkdir -p ${intakeDir};
fi
${pkgs.coreutils}/bin/chown root:root ${intakeDir}
${pkgs.coreutils}/bin/chmod 755 ${intakeDir}
if [ ! -e ${intakePwd} ]; then
${pkgs.coreutils}/bin/touch ${intakePwd}
fi
${pkgs.coreutils}/bin/chown root:intake ${intakePwd}
${pkgs.coreutils}/bin/chmod 660 ${intakePwd}
'';
# Give every intake user the htpasswd wrapper, the shared packages, and the user-specific packages.
users.users =
let
addPackagesToUser = userName: {
${userName}.packages =
[ htpasswdWrapper intake ]
++ intakeCfg.extraPackages
++ intakeCfg.users.${userName}.extraPackages;
};
in mkMerge (map addPackagesToUser enabledUserNames);
# Define the intake package and a python environment to run it from
intake = intakeCfg.package;
pythonEnv = pkgs.python3.withPackages (pypkgs: [ intake ]);
# Enable cron
services.cron.enable = true;
# Assign each user an internal port for their personal intake instance
enabledUsers = filterAttrs (userName: userCfg: userCfg.enable) intakeCfg.users;
enabledUserNames = mapAttrsToList (userName: userCfg: userName) enabledUsers;
userPortList = imap1 (i: userName: {
${userName} = i + intakeCfg.internalPortStart;
}) enabledUserNames;
userPort = foldl (acc: val: acc // val) { } userPortList;
# Define a user service for each configured user
systemd.services =
let
runScript = userName: pkgs.writeShellScript "intake-run.sh" ''
# Add the setuid wrapper directory so `crontab` is accessible
export PATH="${config.security.wrapperDir}:$PATH"
${pythonEnv}/bin/intake run -d /home/${userName}/.local/share/intake --port ${toString userPort.${userName}}
# To avoid polluting PATH with httpd programs, define an htpasswd wrapper
htpasswdWrapper = pkgs.writeShellScriptBin "htpasswd" ''
${pkgs.apacheHttpd}/bin/htpasswd $@
'';
# systemd service definition for a single user, given `services.intake.users.userName` = `userCfg`
userServiceConfig = userName: userCfg: {
"intake@${userName}" = {
description = "Intake service for user ${userName}";
script = "${runScript userName}";
path = intakeCfg.extraPackages ++ userCfg.extraPackages;
serviceConfig = {
User = userName;
Type = "simple";
};
wantedBy = [ "multi-user.target" ];
after = [ "network.target" ];
enable = userCfg.enable;
};
};
in mkMerge (mapAttrsToList userServiceConfig intakeCfg.users);
# Define an nginx reverse proxy to request auth
services.nginx = mkIf (enabledUsers != {}) {
enable = true;
virtualHosts."intake" = mkIf (enabledUsers != {}) {
listen = [ intakeCfg.listen ];
locations."/" = {
proxyPass = "http://127.0.0.1:$target_port";
basicAuthFile = intakePwd;
# File locations
intakeDir = "/etc/intake";
intakePwd = "${intakeDir}/htpasswd";
in
{
# Apply the overlay so intake is included in pkgs.
nixpkgs.overlays = [ flake.overlays.default ];
# Define a user group for access to the htpasswd file. nginx needs to be able to read it.
users.groups.intake.members = mkIf (enabledUsers != { }) (enabledUserNames ++ [ "nginx" ]);
# Define an activation script that ensures that the htpasswd file exists.
system.activationScripts.etc-intake = ''
if [ ! -e ${intakeDir} ]; then
${pkgs.coreutils}/bin/mkdir -p ${intakeDir};
fi
${pkgs.coreutils}/bin/chown root:root ${intakeDir}
${pkgs.coreutils}/bin/chmod 755 ${intakeDir}
if [ ! -e ${intakePwd} ]; then
${pkgs.coreutils}/bin/touch ${intakePwd}
fi
${pkgs.coreutils}/bin/chown root:intake ${intakePwd}
${pkgs.coreutils}/bin/chmod 660 ${intakePwd}
'';
# Give every intake user the htpasswd wrapper, the shared packages, and the user-specific packages.
users.users =
let
addPackagesToUser = userName: {
${userName}.packages = [
htpasswdWrapper
intake
] ++ intakeCfg.extraPackages ++ intakeCfg.users.${userName}.extraPackages;
};
in
mkMerge (map addPackagesToUser enabledUserNames);
# Enable cron
services.cron.enable = true;
# Define a user service for each configured user
systemd.services =
let
runScript =
userName:
pkgs.writeShellScript "intake-run.sh" ''
# Add the setuid wrapper directory so `crontab` is accessible
export PATH="${config.security.wrapperDir}:$PATH"
${pythonEnv}/bin/intake run -d /home/${userName}/.local/share/intake --port ${toString userPort.${userName}}
'';
# systemd service definition for a single user, given `services.intake.users.userName` = `userCfg`
userServiceConfig = userName: userCfg: {
"intake@${userName}" = {
description = "Intake service for user ${userName}";
script = "${runScript userName}";
path = intakeCfg.extraPackages ++ userCfg.extraPackages;
serviceConfig = {
User = userName;
Type = "simple";
};
wantedBy = [ "multi-user.target" ];
after = [ "network.target" ];
enable = userCfg.enable;
};
};
in
mkMerge (mapAttrsToList userServiceConfig intakeCfg.users);
# Define an nginx reverse proxy to request auth
services.nginx = mkIf (enabledUsers != { }) {
enable = true;
virtualHosts."intake" = mkIf (enabledUsers != { }) {
listen = [ intakeCfg.listen ];
locations."/" = {
proxyPass = "http://127.0.0.1:$target_port";
basicAuthFile = intakePwd;
};
extraConfig = foldl (acc: val: acc + val) "" (
mapAttrsToList (userName: port: ''
if ($remote_user = "${userName}") {
set $target_port ${toString port};
}
'') userPort
);
};
extraConfig = foldl (acc: val: acc + val) "" (mapAttrsToList (userName: port: ''
if ($remote_user = "${userName}") {
set $target_port ${toString port};
}
'') userPort);
};
};
};
}

View File

@ -1,6 +1,6 @@
[project]
name = "intake"
version = "1.1.0"
version = "1.3.1"
[project.scripts]
intake = "intake.cli:main"

View File

@ -1,10 +1,9 @@
(import
(
let lock = builtins.fromJSON (builtins.readFile ./flake.lock); in
fetchTarball {
url = "https://github.com/edolstra/flake-compat/archive/${lock.nodes.flake-compat.locked.rev}.tar.gz";
sha256 = lock.nodes.flake-compat.locked.narHash;
}
)
{ src = ./.; }
).shellNix
(import (
let
lock = builtins.fromJSON (builtins.readFile ./flake.lock);
in
fetchTarball {
url = "https://github.com/edolstra/flake-compat/archive/${lock.nodes.flake-compat.locked.rev}.tar.gz";
sha256 = lock.nodes.flake-compat.locked.narHash;
}
) { src = ./.; }).shellNix

View File

@ -2,6 +2,7 @@
"demo": [
"demo_basic_callback",
"demo_logging",
"demo_raw_sh"
"demo_raw_sh",
"demo_oncreate"
]
}

View File

@ -1,5 +1,5 @@
from pathlib import Path
from typing import List, Callable
from typing import Generator, Callable
import pytest
@ -14,15 +14,16 @@ def clean_source(source_path: Path):
@pytest.fixture
def using_source() -> Callable:
def using_source() -> Generator[Callable, None, LocalSource]:
test_data = Path(__file__).parent
sources: List[Path] = []
sources: list[Path] = []
def _using_source(name: str):
source_path = test_data / name
clean_source(source_path)
sources.append(source_path)
return LocalSource(test_data, name)
yield _using_source
for source_path in sources:

View File

@ -1,7 +1,9 @@
{
"action": {
"fetch": {
"exe": "true"
"args": [
"true"
]
}
}
}

View File

@ -1,20 +1,20 @@
{
"action": {
"fetch": {
"exe": "./increment.py",
"args": [
"./increment.py",
"fetch"
]
},
"increment": {
"exe": "./increment.py",
"args": [
"./increment.py",
"increment"
]
},
"decrement": {
"exe": "./increment.py",
"args": [
"./increment.py",
"decrement"
]
}

View File

@ -1,8 +1,8 @@
{
"action": {
"fetch": {
"exe": "python3",
"args": [
"python3",
"update.py"
]
}

View File

@ -0,0 +1,22 @@
{
"action": {
"fetch": {
"args": [
"./update.py",
"fetch"
]
},
"update": {
"args": [
"./update.py",
"update"
]
},
"on_create": {
"args": [
"./update.py",
"update"
]
}
}
}

29
tests/demo_oncreate/update.py Executable file
View File

@ -0,0 +1,29 @@
#!/usr/bin/env python3
import argparse, json, sys, time
parser = argparse.ArgumentParser()
parser.add_argument("action")
args = parser.parse_args()
print("args:", args, file=sys.stderr, flush=True)
if args.action == "fetch":
print(
json.dumps(
{
"id": str(int(time.time())),
"title": "Title has not been updated",
"action": {
"update": 0,
},
}
)
)
if args.action == "update" or args.action == "on_create":
item = sys.stdin.readline()
item = json.loads(item)
item["action"]["update"] += 1
item["title"] = f"Updated {item['action']['update']} times"
print(json.dumps(item))

View File

@ -1,11 +1,11 @@
{
"action": {
"fetch": {
"exe": "sh",
"args": [
"sh",
"-c",
"echo {\\\"id\\\": \\\"$(date +%Y-%m-%d-%H-%M)\\\"}"
]
}
}
}
}

View File

@ -1,8 +1,10 @@
{
"action": {
"fetch": {
"exe": "./update.py",
"args": ["fetch"]
"args": [
"./update.py",
"fetch"
]
}
}
}

View File

@ -1,4 +1,6 @@
import json
from pathlib import Path
import tempfile
from intake.source import fetch_items, update_items, LocalSource
@ -8,6 +10,7 @@ def test_default_source(using_source):
fetch = fetch_items(source)
assert len(fetch) == 0
def test_basic_lifecycle(using_source):
source: LocalSource = using_source("test_inbox")
state = {"inbox": [{"id": "first"}]}
@ -61,3 +64,44 @@ def test_basic_lifecycle(using_source):
items = list(source.get_all_items())
assert len(items) == 1
assert items[0]["id"] == "second"
def test_batch():
with tempfile.TemporaryDirectory() as data_dir:
root = Path(data_dir)
source_dir = root / "batching"
source_dir.mkdir()
config_file = source_dir / "intake.json"
sh_args = [
"python",
"-c",
"import random; print(f'{{\"id\":\"{random.randrange(16**16):016x}\"}}')"
]
batch_config = {
"action": {
"fetch": {
"args": sh_args
}
},
"batch": 0
}
config_file.write_text(json.dumps(batch_config))
source = LocalSource(root, source_dir.name)
# batch sets the tts
fetch1 = fetch_items(source)
assert len(fetch1) == 1
update_items(source, fetch1)
item1 = source.get_item(fetch1[0]["id"])
assert "tts" in item1._item
batch_config["batch"] = 3600
config_file.write_text(json.dumps(batch_config))
fetch2 = fetch_items(source)
assert len(fetch2) == 1
update_items(source, fetch2)
item2 = source.get_item(fetch2[0]["id"])
assert "tts" in item2._item
assert item1["id"] != item2["id"]
assert item2.tts_at == item1.tts_at + 3600