Compare commits

..

No commits in common. "develop" and "master" have entirely different histories.

26 changed files with 281 additions and 553 deletions

4
.gitignore vendored
View File

@ -159,8 +159,6 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
venv
# nixos-shell
nixos.qcow2
@ -168,4 +166,4 @@ nixos.qcow2
result
# test sources
tests/**/*.item
tests/**/*.item

View File

@ -1,18 +0,0 @@
.PHONY: *
help: ## display this help
@awk 'BEGIN{FS = ":.*##"; printf "\033[1m\nUsage\n \033[1;92m make\033[0;36m <target>\033[0m\n"} /^[a-zA-Z0-9_-]+:.*?##/ { printf " \033[36m%-15s\033[0m %s\n", $$1, $$2 } ' $(MAKEFILE_LIST)
venv: ## link the python dev environment to ./venv
nix build .#pythonEnv -o venv
lint: ## run linters
nix fmt
black **/*.py
test: ## run pytest
python -m pytest
reset: ## delete all items in tests/ and update each source once
rm -v tests/*/*.item
find tests -name intake.json -printf "%h\n" | cut -c7- | xargs -n 1 python -m intake update -d tests -s

View File

@ -31,35 +31,32 @@ intake
{
"action": {
"fetch": {
"args": ["program name", "and", "list", "of", "arguments"]
"exe": "<absolute path to program or name on intake's PATH>",
"args": ["list", "of", "program", "arguments"]
},
"<action name>": {
"exe": "...",
"args": "..."
}
},
"env": {
"...": "..."
},
"cron": "* * * * *",
"batch": "<number>"
"cron": "* * * * *"
}
```
Each key under `action` defines an action that can be taken for the source. A source must at least have a `fetch` action. If an action named `on_create` is defined for the source, it is executed once for an item when that item is created, that is, the first time the item is returned from the source.
Each key under `action` defines an action that can be taken for the source. An action must contain `exe` and may contain `args`. A source must have a `fetch` action.
Each key under `env` defines an environment variable that will be set when `fetch` or other actions are executed.
Each key under `env` defines an environment variable that will be set when actions are executed.
If `cron` is present, it must define a crontab schedule. Intake will automatically create crontab entries to update each source according to its cron schedule.
`batch` may be a number or string of a number. If it is present, items created by the source will be batched via `tts` so that all items created in a single 24-hour window become visible at the same time. Items created with a longer `tts` will keep their `tts`.
The batch window is computed from midnight to midnight UTC, offset by the value of `batch` (in seconds).
## Interface for source programs
Intake interacts with sources by executing the actions defined in the source's `intake.json`. The `fetch` action is required and used to check for new feed items when `intake update` is executed.
To execute an action, intake executes the command given by `args`. The process's working directory is set to the source's folder, i.e. the folder containing `intake.json`. The process's environment is as follows:
To execute an action, intake executes the `exe` program for the action with the corresponding `args` (if present) as arguments. The process's working directory is set to the source's folder, i.e. the folder containing `intake.json`. The process's environment is as follows:
* intake's environment is inherited.
* `STATE_PATH` is set to the absolute path of `state`.

View File

@ -1,9 +1,10 @@
(import (
let
lock = builtins.fromJSON (builtins.readFile ./flake.lock);
in
fetchTarball {
url = "https://github.com/edolstra/flake-compat/archive/${lock.nodes.flake-compat.locked.rev}.tar.gz";
sha256 = lock.nodes.flake-compat.locked.narHash;
}
) { src = ./.; }).defaultNix
(import
(
let lock = builtins.fromJSON (builtins.readFile ./flake.lock); in
fetchTarball {
url = "https://github.com/edolstra/flake-compat/archive/${lock.nodes.flake-compat.locked.rev}.tar.gz";
sha256 = lock.nodes.flake-compat.locked.narHash;
}
)
{ src = ./.; }
).defaultNix

View File

@ -14,7 +14,7 @@
users.users.bob = {
isNormalUser = true;
password = "beta";
uid = 1001;
uid = 1001;
packages = [ pkgs.intake ];
};
@ -26,13 +26,11 @@
};
# Expose the vm's intake revproxy at host port 5234
virtualisation.forwardPorts = [
{
from = "host";
host.port = 5234;
guest.port = 8080;
}
];
virtualisation.forwardPorts = [{
from = "host";
host.port = 5234;
guest.port = 8080;
}];
# Mount the demo content for both users
nixos-shell.mounts = {
@ -49,20 +47,20 @@
# Create an activation script that copies and chowns the demo content
# chmod 777 because the users may not exist when the activation script runs
system.activationScripts =
let
userSetup = name: uid: ''
${pkgs.coreutils}/bin/mkdir -p /home/${name}/.local/share/intake
${pkgs.coreutils}/bin/cp -r /mnt/${name}/* /home/${name}/.local/share/intake/
${pkgs.coreutils}/bin/chown -R ${uid} /home/${name}
${pkgs.findutils}/bin/find /home/${name} -type d -exec ${pkgs.coreutils}/bin/chmod 755 {} \;
${pkgs.findutils}/bin/find /home/${name} -type f -exec ${pkgs.coreutils}/bin/chmod 644 {} \;
'';
in
{
aliceSetup = userSetup "alice" "1000";
bobSetup = userSetup "bob" "1001";
};
system.activationScripts =
let
userSetup = name: uid: ''
${pkgs.coreutils}/bin/mkdir -p /home/${name}/.local/share/intake
${pkgs.coreutils}/bin/cp -r /mnt/${name}/* /home/${name}/.local/share/intake/
${pkgs.coreutils}/bin/chown -R ${uid} /home/${name}
${pkgs.findutils}/bin/find /home/${name} -type d -exec ${pkgs.coreutils}/bin/chmod 755 {} \;
${pkgs.findutils}/bin/find /home/${name} -type f -exec ${pkgs.coreutils}/bin/chmod 644 {} \;
'';
in
{
aliceSetup = userSetup "alice" "1000";
bobSetup = userSetup "bob" "1001";
};
# Put the demo sources on the global PATH
environment.variables.PATH = "/mnt/sources";

135
flake.nix
View File

@ -13,90 +13,61 @@
nixos-shell.inputs.nixpkgs.follows = "nixpkgs";
};
outputs =
{
self,
nixpkgs,
flake-compat,
nixos-shell,
}:
let
inherit (nixpkgs.lib) makeOverridable nixosSystem;
system = "x86_64-linux";
pkgs = nixpkgs.legacyPackages.${system};
pythonEnv = pkgs.python3.withPackages (
pypkgs: with pypkgs; [
flask
black
pytest
]
);
in
{
formatter.${system} = nixpkgs.legacyPackages.${system}.nixfmt-rfc-style;
packages.${system} =
let
pkgs = (
import nixpkgs {
inherit system;
overlays = [ self.overlays.default ];
}
);
in
{
default = self.packages.${system}.intake;
inherit (pkgs) intake;
inherit pythonEnv;
};
devShells.${system} = {
default = pkgs.mkShell {
packages = [
pythonEnv
pkgs.gnumake
pkgs.nixos-shell
# We only take this dependency for htpasswd, which is a little unfortunate
pkgs.apacheHttpd
];
shellHook = ''
PS1="(develop) $PS1"
'';
};
};
overlays.default = final: prev: {
intake = final.python3Packages.buildPythonPackage {
name = "intake";
src = builtins.path {
path = ./.;
name = "intake";
};
format = "pyproject";
propagatedBuildInputs = with final.python3Packages; [
flask
setuptools
];
};
};
templates.source = {
path = builtins.path {
path = ./template;
name = "source";
};
description = "A basic intake source config";
};
nixosModules.default = import ./module.nix self;
nixosConfigurations."demo" = makeOverridable nixosSystem {
outputs = { self, nixpkgs, flake-compat, nixos-shell }:
let
inherit (nixpkgs.lib) makeOverridable nixosSystem;
system = "x86_64-linux";
in {
packages.${system} = let
pkgs = (import nixpkgs {
inherit system;
modules = [
nixos-shell.nixosModules.nixos-shell
self.nixosModules.default
./demo
overlays = [ self.overlays.default ];
});
in {
default = self.packages.${system}.intake;
inherit (pkgs) intake;
};
devShells.${system} = {
default = let
pkgs = nixpkgs.legacyPackages.${system};
pythonEnv = pkgs.python3.withPackages (pypkgs: with pypkgs; [ flask black pytest ]);
in pkgs.mkShell {
packages = [
pythonEnv
pkgs.nixos-shell
# We only take this dependency for htpasswd, which is a little unfortunate
pkgs.apacheHttpd
];
shellHook = ''
PS1="(develop) $PS1"
'';
};
};
overlays.default = final: prev: {
intake = final.python3Packages.buildPythonPackage {
name = "intake";
src = builtins.path { path = ./.; name = "intake"; };
format = "pyproject";
propagatedBuildInputs = with final.python3Packages; [ flask setuptools ];
};
};
templates.source = {
path = builtins.path { path = ./template; name = "source"; };
description = "A basic intake source config";
};
nixosModules.default = import ./module.nix self;
nixosConfigurations."demo" = makeOverridable nixosSystem {
inherit system;
modules = [
nixos-shell.nixosModules.nixos-shell
self.nixosModules.default
./demo
];
};
};
}

View File

@ -18,16 +18,9 @@ from flask import (
current_app,
)
from intake.core import intake_data_dir
from intake.crontab import update_crontab_entries
from intake.source import (
LocalSource,
execute_action,
Item,
fetch_items,
update_items,
intake_data_dir,
)
from intake.types import InvalidConfigException, SourceUpdateException
from intake.source import LocalSource, execute_action, Item
# Globals
app = Flask(__name__)
@ -122,23 +115,10 @@ def root():
if channels_config_path.exists():
channels = json.loads(channels_config_path.read_text(encoding="utf8"))
channel_counts = {
channel: len(
[
item
for source in sources
for item in LocalSource(data_path, source).get_all_items()
if not item.is_hidden
]
)
for channel, sources in channels.items()
}
return render_template(
"home.jinja2",
sources=sources,
channels=channels,
channel_counts=channel_counts,
)
@ -326,15 +306,13 @@ def _parse_source_config(config_str: str):
if "fetch" not in action:
return ("No fetch action defined", {})
fetch = action["fetch"]
if "exe" not in fetch and not fetch.get("args"):
if "exe" not in fetch:
return ("No fetch exe", {})
config = {"action": parsed["action"]}
if "env" in parsed:
config["env"] = parsed["env"]
if "cron" in parsed:
config["cron"] = parsed["cron"]
if "batch" in parsed:
config["batch"] = parsed["batch"]
return (None, config)
@ -390,24 +368,6 @@ def _parse_channels_config(config_str: str):
return (None, parsed)
@app.post("/fetch/<string:source_name>")
@auth_check
def fetch(source_name: str):
data_path: Path = current_app.config["INTAKE_DATA"]
source = LocalSource(data_path, source_name)
try:
items = fetch_items(source)
update_items(source, items)
titles = "\n".join(f"<li>{item.display_title}</li>" for item in items)
source_url = url_for("source_feed", name=source_name)
return f'Update returned {len(items)} items:<ul>{titles}</ul><p><a href="{source_url}">{source_name}</a></p>'
except InvalidConfigException as ex:
abort(500, f"Could not fetch {source_name}:\n{ex}")
except SourceUpdateException as ex:
abort(500, f"Error updating source {source_name}:\n{ex}")
@app.post("/add")
@auth_check
def add_item():
@ -419,7 +379,7 @@ def add_item():
config_path = source_path / "intake.json"
if not config_path.exists():
config_path.write_text(
json.dumps({"action": {"fetch": {"args": ["true"]}}}, indent=2)
json.dumps({"action": {"fetch": {"exe": "true"}}}, indent=2)
)
source = LocalSource(source_path.parent, source_path.name)

View File

@ -10,14 +10,9 @@ import pwd
import subprocess
import sys
from intake.core import intake_data_dir
from intake.crontab import update_crontab_entries
from intake.source import (
fetch_items,
LocalSource,
update_items,
execute_action,
intake_data_dir,
)
from intake.source import fetch_items, LocalSource, update_items, execute_action
from intake.types import InvalidConfigException, SourceUpdateException
@ -57,6 +52,7 @@ def cmd_edit(cmd_args):
{
"action": {
"fetch": {
"exe": "",
"args": [],
},
},

12
intake/core.py Normal file
View File

@ -0,0 +1,12 @@
from pathlib import Path
import os
def intake_data_dir() -> Path:
if intake_data := os.environ.get("INTAKE_DATA"):
return Path(intake_data)
if xdg_data_home := os.environ.get("XDG_DATA_HOME"):
return Path(xdg_data_home) / "intake"
if home := os.environ.get("HOME"):
return Path(home) / ".local" / "share" / "intake"
raise Exception("No intake data directory defined")

View File

@ -1,6 +1,5 @@
from pathlib import Path
import os
import shutil
import subprocess
import sys
@ -29,13 +28,15 @@ def update_crontab_entries(data_path: Path):
Update the intake-managed section of the user's crontab.
"""
# If there is no crontab command available, quit early.
crontab = shutil.which("crontab")
if not crontab:
print("No crontab found, skipping", file=sys.stderr)
cmd = ("command", "-v", "crontab")
print("Executing", *cmd, file=sys.stderr)
crontab_exists = subprocess.run(cmd, shell=True)
if crontab_exists.returncode:
print("Could not update crontab", file=sys.stderr)
return
# Get the current crontab
cmd = [crontab, "-e"]
cmd = ["crontab", "-e"]
print("Executing", *cmd, file=sys.stderr)
get_crontab = subprocess.run(
cmd,
@ -76,7 +77,7 @@ def update_crontab_entries(data_path: Path):
print("Updating", len(new_crontab_lines) - 2, "crontab entries", file=sys.stderr)
# Save the updated crontab
cmd = [crontab, "-"]
cmd = ["crontab", "-"]
print("Executing", *cmd, file=sys.stderr)
new_crontab: bytes = "\n".join(new_crontab_lines).encode("utf8")
save_crontab = subprocess.Popen(

View File

@ -3,7 +3,7 @@ from pathlib import Path
from subprocess import Popen, PIPE, TimeoutExpired
from threading import Thread
from time import time as current_time
from typing import Generator
from typing import List
import json
import os
import os.path
@ -12,16 +12,6 @@ import sys
from intake.types import InvalidConfigException, SourceUpdateException
def intake_data_dir() -> Path:
if intake_data := os.environ.get("INTAKE_DATA"):
return Path(intake_data)
if xdg_data_home := os.environ.get("XDG_DATA_HOME"):
return Path(xdg_data_home) / "intake"
if home := os.environ.get("HOME"):
return Path(home) / ".local" / "share" / "intake"
raise Exception("No intake data directory defined")
class Item:
"""
A wrapper for an item object.
@ -81,31 +71,26 @@ class Item:
# The time-to-live fields protects an item from removal until expiry.
# This is mainly used to avoid old items resurfacing when their source
# cannot guarantee monotonocity.
if "ttl" in self._item and self.ttl_at > current_time():
return False
if "ttl" in self._item:
ttl_date = self._item["created"] + self._item["ttl"]
if ttl_date > current_time():
return False
# The time-to-die field puts a maximum lifespan on an item, removing it
# even if it is active.
if "ttd" in self._item and self.ttd_at < current_time():
return True
if "ttd" in self._item:
ttd_date = self._item["created"] + self._item["ttd"]
if ttd_date < current_time():
return True
return not self._item["active"]
@property
def tts_at(self):
return self._item["created"] + self._item.get("tts", 0)
@property
def ttl_at(self):
return self._item["created"] + self._item.get("ttl", 0)
@property
def ttd_at(self):
return self._item["created"] + self._item.get("ttd", 0)
@property
def before_tts(self):
return "tts" in self._item and current_time() < self.tts_at
return (
"tts" in self._item
and current_time() < self._item["created"] + self._item["tts"]
)
@property
def is_hidden(self):
@ -173,7 +158,7 @@ class LocalSource:
def get_item_path(self, item_id: dict) -> Path:
return self.source_path / f"{item_id}.item"
def get_item_ids(self) -> list[str]:
def get_item_ids(self) -> List[str]:
return [
filepath.name[:-5]
for filepath in self.source_path.iterdir()
@ -198,7 +183,7 @@ class LocalSource:
def delete_item(self, item_id) -> None:
os.remove(self.get_item_path(item_id))
def get_all_items(self) -> Generator[Item, None, None]:
def get_all_items(self) -> List[Item]:
for filepath in self.source_path.iterdir():
if filepath.name.endswith(".item"):
yield Item(self, json.loads(filepath.read_text(encoding="utf8")))
@ -233,7 +218,7 @@ def _read_stderr(process: Popen) -> None:
def _execute_source_action(
source: LocalSource, action: str, input: str, timeout: timedelta
) -> list[str]:
) -> List[str]:
"""
Execute the action from a given source. If stdin is specified, pass it
along to the process. Returns lines from stdout.
@ -244,13 +229,10 @@ def _execute_source_action(
if not action_cfg:
raise InvalidConfigException(f"No such action {action}")
exe = [action_cfg["exe"]] if "exe" in action_cfg else []
command = exe + action_cfg.get("args", [])
if not command:
if "exe" not in action_cfg:
raise InvalidConfigException(f"No exe for action {action}")
command = [action_cfg["exe"], *action_cfg.get("args", [])]
config_env = {key: str(value) for key, value in config.get("env", {}).items()}
env = {
**os.environ.copy(),
@ -301,13 +283,13 @@ def _execute_source_action(
return output
def fetch_items(source: LocalSource, timeout: int = 60) -> list[Item]:
def fetch_items(source: LocalSource, timeout: int = 60) -> List[Item]:
"""
Execute the feed source and return the current feed items.
Returns a list of feed items on success.
Throws SourceUpdateException if the feed source update failed.
"""
items: list[Item] = []
items: List[Item] = []
output = _execute_source_action(source, "fetch", None, timedelta(timeout))
@ -343,54 +325,29 @@ def execute_action(
raise SourceUpdateException("invalid json")
def update_items(source: LocalSource, fetched_items: list[Item]):
def update_items(source: LocalSource, fetched_items: List[Item]):
"""
Update the source with a batch of new items, doing creations, updates, and
deletions as necessary.
"""
config = source.get_config()
# Get a list of item ids that already existed for this source.
prior_ids = source.get_item_ids()
print(f"Found {len(prior_ids)} prior items", file=sys.stderr)
# Determine which items are new and which are updates.
new_items: list[Item] = []
upd_items: list[Item] = []
new_items: List[Item] = []
upd_items: List[Item] = []
for item in fetched_items:
if source.item_exists(item["id"]):
upd_items.append(item)
else:
new_items.append(item)
# If the source is batched, set the tts on new items to at least the batch tts
if "batch" in config:
try:
batch_adj = int(config["batch"])
now = current_time() - batch_adj
batch_start = now - (now % 86400)
batch_end = batch_start + 86400 + batch_adj
for item in new_items:
min_tts = batch_end - item["created"]
item["tts"] = min(min_tts, item.get("tts", min_tts))
except:
pass
# Write all the new items to the source directory.
for item in new_items:
# TODO: support on-create trigger
source.save_item(item)
# If the source has an on-create trigger, run it for new items
if "on_create" in config.get("action", {}):
for item in new_items:
try:
execute_action(source, item["id"], "on_create")
except Exception as ex:
print(
f"on_create failed for {source.source_name}/{item['id']}:\n{ex}",
file=sys.stderr,
)
# Update the other items using the fetched items' values.
for upd_item in upd_items:
old_item = source.get_item(upd_item["id"])

View File

@ -174,13 +174,11 @@ var doAction = function (source, itemid, action) {
{# source/id/created footer line #}
{% if item.source or item.id or item.created %}
<span class="item-info" title="{% if item.tags %}{{ 'Tags: {}'.format(', '.join(item.tags)) }}{% else %}No tags{% endif %}">
<span class="item-info" title="{{ 'Tags: {}'.format(', '.join(item.tags)) }}">
{% if item.source %}{{item.source}}{% endif %}
{% if item.id %}{{item.id}}{% endif %}
{% if item.created %}{{item.created|datetimeformat}}{% endif %}
{% if item.ttl %}<span title="TTL {{item.ttl_at|datetimeformat}}">[L]</span>{% endif %}
{% if item.ttd %}<span title="TTD {{item.ttd_at|datetimeformat}}">[D]</span>{% endif %}
{% if item.tts %}<span title="TTS {{item.tts_at|datetimeformat}}">[S]</span>{% endif %}
{% if item.ttl %}L{% endif %}{% if item.ttd %}D{% endif %}{% if item.tts %}S{% endif %}
</span>
{% endif %}

View File

@ -34,12 +34,6 @@ summary:focus {
width: 100%;
resize: vertical;
}
.intake-sources td {
padding-block: 0.4em;
}
.intake-sources form {
margin: 0
}
</style>
</head>
<body>
@ -52,13 +46,7 @@ summary:focus {
<p>No channels found.</p>
{% else %}
{% for channel in channels %}
<p><a href="{{ url_for('channel_feed', name=channel) }}">
{%- if channel_counts[channel] -%}
({{ channel_counts[channel] }}) {{ channel }}
{%- else -%}
{{ channel }}
{%- endif -%}
</a></p>
<p><a href="{{ url_for('channel_feed', name=channel) }}">{{ channel }}</a></p>
{% endfor %}
{% endif %}
<p><a href="{{ url_for('channels_edit') }}">Edit channels</a></p>
@ -71,22 +59,17 @@ summary:focus {
{% if not sources %}
<p>No sources found.</p>
{% else %}
<table class="intake-sources">
{% for source in sources %}
<tr>
<td>
<p>
{%- for channel, srcs in channels|items -%}
{%- if source.source_name in srcs -%}
^
{%- endif -%}
{%- endfor -%}
</td>
<td><form id="{{ source.source_name }}"><button type="submit" formmethod=post formaction="{{ url_for('fetch', source_name=source.source_name) }}" />fetch</button></form></td>
<td>(<a href="{{ url_for('source_edit', name=source.source_name) }}">edit</a>)</td>
<td><a href="{{ url_for('source_feed', name=source.source_name) }}">{{ source.source_name|safe }}</a></td>
</tr>
<a href="{{ url_for('source_feed', name=source.source_name) }}">{{ source.source_name|safe }}</a>
(<a href="{{ url_for('source_edit', name=source.source_name) }}">edit</a>)
</p>
{% endfor %}
</table>
{% endif %}
</details>
</article>

View File

@ -1,179 +1,153 @@
flake:
{
config,
lib,
pkgs,
...
}:
flake: { config, lib, pkgs, ... }:
let
inherit (lib)
filterAttrs
foldl
imap1
mapAttrsToList
mkEnableOption
mkIf
mkMerge
mkOption
mkPackageOption
types
;
inherit (lib) filterAttrs foldl imap1 mapAttrsToList mkEnableOption mkIf mkMerge mkOption mkPackageOption types;
intakeCfg = config.services.intake;
in
{
in {
options = {
services.intake = {
listen.addr = mkOption {
type = types.str;
default = "0.0.0.0";
description = "The listen address for the entry point to intake services. This endpoint will redirect to a local port based on the request's HTTP Basic Auth credentials.";
description = "The listen address for the entry point to intake services. This endpoint will redirect to a "
"local port based on the request's HTTP Basic Auth credentials.";
};
listen.port = mkOption {
type = types.port;
default = 80;
description = "The listen port for the entry point to intake services. This endpoint will redirect to a local port based on the request's HTTP Basic Auth credentials.";
description = "The listen port for the entry point to intake services. This endpoint will redirect to a local "
"port based on the request's HTTP Basic Auth credentials.";
};
package = mkPackageOption pkgs "intake" { };
package = mkPackageOption pkgs "intake" {};
internalPortStart = mkOption {
type = types.port;
default = 24130;
description = "The first port to use for internal service endpoints. A number of ports will be continguously allocated equal to the number of users with enabled intake services.";
description = "The first port to use for internal service endpoints. A number of ports will be continguously "
"allocated equal to the number of users with enabled intake services.";
};
extraPackages = mkOption {
type = types.listOf types.package;
default = [ ];
default = [];
description = "Extra packages available to all enabled users and their intake services.";
};
users = mkOption {
description = "User intake service definitions.";
default = { };
type = types.attrsOf (
types.submodule {
options = {
enable = mkEnableOption "intake, a personal feed aggregator.";
default = {};
type = types.attrsOf (types.submodule {
options = {
enable = mkEnableOption "intake, a personal feed aggregator.";
extraPackages = mkOption {
type = types.listOf types.package;
default = [ ];
description = "Extra packages available to this user and their intake service.";
};
extraPackages = mkOption {
type = types.listOf types.package;
default = [];
description = "Extra packages available to this user and their intake service.";
};
}
);
};
});
};
};
};
config =
let
# Define the intake package and a python environment to run it from
intake = intakeCfg.package;
pythonEnv = pkgs.python3.withPackages (pypkgs: [ intake ]);
# Assign each user an internal port for their personal intake instance
enabledUsers = filterAttrs (userName: userCfg: userCfg.enable) intakeCfg.users;
enabledUserNames = mapAttrsToList (userName: userCfg: userName) enabledUsers;
userPortList = imap1 (i: userName: { ${userName} = i + intakeCfg.internalPortStart; }) enabledUserNames;
userPort = foldl (acc: val: acc // val) {} userPortList;
# To avoid polluting PATH with httpd programs, define an htpasswd wrapper
htpasswdWrapper = pkgs.writeShellScriptBin "htpasswd" ''
${pkgs.apacheHttpd}/bin/htpasswd $@
'';
# File locations
intakeDir = "/etc/intake";
intakePwd = "${intakeDir}/htpasswd";
in {
# Apply the overlay so intake is included in pkgs.
nixpkgs.overlays = [ flake.overlays.default ];
# Define a user group for access to the htpasswd file. nginx needs to be able to read it.
users.groups.intake.members = mkIf (enabledUsers != {}) (enabledUserNames ++ [ "nginx" ]);
# Define an activation script that ensures that the htpasswd file exists.
system.activationScripts.etc-intake = ''
if [ ! -e ${intakeDir} ]; then
${pkgs.coreutils}/bin/mkdir -p ${intakeDir};
fi
${pkgs.coreutils}/bin/chown root:root ${intakeDir}
${pkgs.coreutils}/bin/chmod 755 ${intakeDir}
if [ ! -e ${intakePwd} ]; then
${pkgs.coreutils}/bin/touch ${intakePwd}
fi
${pkgs.coreutils}/bin/chown root:intake ${intakePwd}
${pkgs.coreutils}/bin/chmod 660 ${intakePwd}
'';
# Give every intake user the htpasswd wrapper, the shared packages, and the user-specific packages.
users.users =
let
# Define the intake package and a python environment to run it from
intake = intakeCfg.package;
pythonEnv = pkgs.python3.withPackages (pypkgs: [ intake ]);
addPackagesToUser = userName: {
${userName}.packages =
[ htpasswdWrapper intake ]
++ intakeCfg.extraPackages
++ intakeCfg.users.${userName}.extraPackages;
};
in mkMerge (map addPackagesToUser enabledUserNames);
# Assign each user an internal port for their personal intake instance
enabledUsers = filterAttrs (userName: userCfg: userCfg.enable) intakeCfg.users;
enabledUserNames = mapAttrsToList (userName: userCfg: userName) enabledUsers;
userPortList = imap1 (i: userName: {
${userName} = i + intakeCfg.internalPortStart;
}) enabledUserNames;
userPort = foldl (acc: val: acc // val) { } userPortList;
# Enable cron
services.cron.enable = true;
# To avoid polluting PATH with httpd programs, define an htpasswd wrapper
htpasswdWrapper = pkgs.writeShellScriptBin "htpasswd" ''
${pkgs.apacheHttpd}/bin/htpasswd $@
# Define a user service for each configured user
systemd.services =
let
runScript = userName: pkgs.writeShellScript "intake-run.sh" ''
# Add the setuid wrapper directory so `crontab` is accessible
export PATH="${config.security.wrapperDir}:$PATH"
${pythonEnv}/bin/intake run -d /home/${userName}/.local/share/intake --port ${toString userPort.${userName}}
'';
# File locations
intakeDir = "/etc/intake";
intakePwd = "${intakeDir}/htpasswd";
in
{
# Apply the overlay so intake is included in pkgs.
nixpkgs.overlays = [ flake.overlays.default ];
# Define a user group for access to the htpasswd file. nginx needs to be able to read it.
users.groups.intake.members = mkIf (enabledUsers != { }) (enabledUserNames ++ [ "nginx" ]);
# Define an activation script that ensures that the htpasswd file exists.
system.activationScripts.etc-intake = ''
if [ ! -e ${intakeDir} ]; then
${pkgs.coreutils}/bin/mkdir -p ${intakeDir};
fi
${pkgs.coreutils}/bin/chown root:root ${intakeDir}
${pkgs.coreutils}/bin/chmod 755 ${intakeDir}
if [ ! -e ${intakePwd} ]; then
${pkgs.coreutils}/bin/touch ${intakePwd}
fi
${pkgs.coreutils}/bin/chown root:intake ${intakePwd}
${pkgs.coreutils}/bin/chmod 660 ${intakePwd}
'';
# Give every intake user the htpasswd wrapper, the shared packages, and the user-specific packages.
users.users =
let
addPackagesToUser = userName: {
${userName}.packages = [
htpasswdWrapper
intake
] ++ intakeCfg.extraPackages ++ intakeCfg.users.${userName}.extraPackages;
# systemd service definition for a single user, given `services.intake.users.userName` = `userCfg`
userServiceConfig = userName: userCfg: {
"intake@${userName}" = {
description = "Intake service for user ${userName}";
script = "${runScript userName}";
path = intakeCfg.extraPackages ++ userCfg.extraPackages;
serviceConfig = {
User = userName;
Type = "simple";
};
in
mkMerge (map addPackagesToUser enabledUserNames);
# Enable cron
services.cron.enable = true;
# Define a user service for each configured user
systemd.services =
let
runScript =
userName:
pkgs.writeShellScript "intake-run.sh" ''
# Add the setuid wrapper directory so `crontab` is accessible
export PATH="${config.security.wrapperDir}:$PATH"
${pythonEnv}/bin/intake run -d /home/${userName}/.local/share/intake --port ${toString userPort.${userName}}
'';
# systemd service definition for a single user, given `services.intake.users.userName` = `userCfg`
userServiceConfig = userName: userCfg: {
"intake@${userName}" = {
description = "Intake service for user ${userName}";
script = "${runScript userName}";
path = intakeCfg.extraPackages ++ userCfg.extraPackages;
serviceConfig = {
User = userName;
Type = "simple";
};
wantedBy = [ "multi-user.target" ];
after = [ "network.target" ];
enable = userCfg.enable;
};
};
in
mkMerge (mapAttrsToList userServiceConfig intakeCfg.users);
# Define an nginx reverse proxy to request auth
services.nginx = mkIf (enabledUsers != { }) {
enable = true;
virtualHosts."intake" = mkIf (enabledUsers != { }) {
listen = [ intakeCfg.listen ];
locations."/" = {
proxyPass = "http://127.0.0.1:$target_port";
basicAuthFile = intakePwd;
};
extraConfig = foldl (acc: val: acc + val) "" (
mapAttrsToList (userName: port: ''
if ($remote_user = "${userName}") {
set $target_port ${toString port};
}
'') userPort
);
wantedBy = [ "multi-user.target" ];
after = [ "network.target" ];
enable = userCfg.enable;
};
};
in mkMerge (mapAttrsToList userServiceConfig intakeCfg.users);
# Define an nginx reverse proxy to request auth
services.nginx = mkIf (enabledUsers != {}) {
enable = true;
virtualHosts."intake" = mkIf (enabledUsers != {}) {
listen = [ intakeCfg.listen ];
locations."/" = {
proxyPass = "http://127.0.0.1:$target_port";
basicAuthFile = intakePwd;
};
extraConfig = foldl (acc: val: acc + val) "" (mapAttrsToList (userName: port: ''
if ($remote_user = "${userName}") {
set $target_port ${toString port};
}
'') userPort);
};
};
};
}

View File

@ -1,6 +1,6 @@
[project]
name = "intake"
version = "1.3.1"
version = "1.1.0"
[project.scripts]
intake = "intake.cli:main"

View File

@ -1,9 +1,10 @@
(import (
let
lock = builtins.fromJSON (builtins.readFile ./flake.lock);
in
fetchTarball {
url = "https://github.com/edolstra/flake-compat/archive/${lock.nodes.flake-compat.locked.rev}.tar.gz";
sha256 = lock.nodes.flake-compat.locked.narHash;
}
) { src = ./.; }).shellNix
(import
(
let lock = builtins.fromJSON (builtins.readFile ./flake.lock); in
fetchTarball {
url = "https://github.com/edolstra/flake-compat/archive/${lock.nodes.flake-compat.locked.rev}.tar.gz";
sha256 = lock.nodes.flake-compat.locked.narHash;
}
)
{ src = ./.; }
).shellNix

View File

@ -2,7 +2,6 @@
"demo": [
"demo_basic_callback",
"demo_logging",
"demo_raw_sh",
"demo_oncreate"
"demo_raw_sh"
]
}

View File

@ -1,5 +1,5 @@
from pathlib import Path
from typing import Generator, Callable
from typing import List, Callable
import pytest
@ -14,16 +14,15 @@ def clean_source(source_path: Path):
@pytest.fixture
def using_source() -> Generator[Callable, None, LocalSource]:
def using_source() -> Callable:
test_data = Path(__file__).parent
sources: list[Path] = []
sources: List[Path] = []
def _using_source(name: str):
source_path = test_data / name
clean_source(source_path)
sources.append(source_path)
return LocalSource(test_data, name)
yield _using_source
for source_path in sources:

View File

@ -1,9 +1,7 @@
{
"action": {
"fetch": {
"args": [
"true"
]
"exe": "true"
}
}
}

View File

@ -1,20 +1,20 @@
{
"action": {
"fetch": {
"exe": "./increment.py",
"args": [
"./increment.py",
"fetch"
]
},
"increment": {
"exe": "./increment.py",
"args": [
"./increment.py",
"increment"
]
},
"decrement": {
"exe": "./increment.py",
"args": [
"./increment.py",
"decrement"
]
}

View File

@ -1,8 +1,8 @@
{
"action": {
"fetch": {
"exe": "python3",
"args": [
"python3",
"update.py"
]
}

View File

@ -1,22 +0,0 @@
{
"action": {
"fetch": {
"args": [
"./update.py",
"fetch"
]
},
"update": {
"args": [
"./update.py",
"update"
]
},
"on_create": {
"args": [
"./update.py",
"update"
]
}
}
}

View File

@ -1,29 +0,0 @@
#!/usr/bin/env python3
import argparse, json, sys, time
parser = argparse.ArgumentParser()
parser.add_argument("action")
args = parser.parse_args()
print("args:", args, file=sys.stderr, flush=True)
if args.action == "fetch":
print(
json.dumps(
{
"id": str(int(time.time())),
"title": "Title has not been updated",
"action": {
"update": 0,
},
}
)
)
if args.action == "update" or args.action == "on_create":
item = sys.stdin.readline()
item = json.loads(item)
item["action"]["update"] += 1
item["title"] = f"Updated {item['action']['update']} times"
print(json.dumps(item))

View File

@ -1,11 +1,11 @@
{
"action": {
"fetch": {
"exe": "sh",
"args": [
"sh",
"-c",
"echo {\\\"id\\\": \\\"$(date +%Y-%m-%d-%H-%M)\\\"}"
]
}
}
}
}

View File

@ -1,10 +1,8 @@
{
"action": {
"fetch": {
"args": [
"./update.py",
"fetch"
]
"exe": "./update.py",
"args": ["fetch"]
}
}
}

View File

@ -1,6 +1,4 @@
import json
from pathlib import Path
import tempfile
from intake.source import fetch_items, update_items, LocalSource
@ -10,7 +8,6 @@ def test_default_source(using_source):
fetch = fetch_items(source)
assert len(fetch) == 0
def test_basic_lifecycle(using_source):
source: LocalSource = using_source("test_inbox")
state = {"inbox": [{"id": "first"}]}
@ -64,44 +61,3 @@ def test_basic_lifecycle(using_source):
items = list(source.get_all_items())
assert len(items) == 1
assert items[0]["id"] == "second"
def test_batch():
with tempfile.TemporaryDirectory() as data_dir:
root = Path(data_dir)
source_dir = root / "batching"
source_dir.mkdir()
config_file = source_dir / "intake.json"
sh_args = [
"python",
"-c",
"import random; print(f'{{\"id\":\"{random.randrange(16**16):016x}\"}}')"
]
batch_config = {
"action": {
"fetch": {
"args": sh_args
}
},
"batch": 0
}
config_file.write_text(json.dumps(batch_config))
source = LocalSource(root, source_dir.name)
# batch sets the tts
fetch1 = fetch_items(source)
assert len(fetch1) == 1
update_items(source, fetch1)
item1 = source.get_item(fetch1[0]["id"])
assert "tts" in item1._item
batch_config["batch"] = 3600
config_file.write_text(json.dumps(batch_config))
fetch2 = fetch_items(source)
assert len(fetch2) == 1
update_items(source, fetch2)
item2 = source.get_item(fetch2[0]["id"])
assert "tts" in item2._item
assert item1["id"] != item2["id"]
assert item2.tts_at == item1.tts_at + 3600