Compare commits

..

No commits in common. "develop" and "master" have entirely different histories.

26 changed files with 281 additions and 553 deletions

2
.gitignore vendored
View File

@ -159,8 +159,6 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder. # option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/ #.idea/
venv
# nixos-shell # nixos-shell
nixos.qcow2 nixos.qcow2

View File

@ -1,18 +0,0 @@
.PHONY: *
help: ## display this help
@awk 'BEGIN{FS = ":.*##"; printf "\033[1m\nUsage\n \033[1;92m make\033[0;36m <target>\033[0m\n"} /^[a-zA-Z0-9_-]+:.*?##/ { printf " \033[36m%-15s\033[0m %s\n", $$1, $$2 } ' $(MAKEFILE_LIST)
venv: ## link the python dev environment to ./venv
nix build .#pythonEnv -o venv
lint: ## run linters
nix fmt
black **/*.py
test: ## run pytest
python -m pytest
reset: ## delete all items in tests/ and update each source once
rm -v tests/*/*.item
find tests -name intake.json -printf "%h\n" | cut -c7- | xargs -n 1 python -m intake update -d tests -s

View File

@ -31,35 +31,32 @@ intake
{ {
"action": { "action": {
"fetch": { "fetch": {
"args": ["program name", "and", "list", "of", "arguments"] "exe": "<absolute path to program or name on intake's PATH>",
"args": ["list", "of", "program", "arguments"]
}, },
"<action name>": { "<action name>": {
"exe": "...",
"args": "..." "args": "..."
} }
}, },
"env": { "env": {
"...": "..." "...": "..."
}, },
"cron": "* * * * *", "cron": "* * * * *"
"batch": "<number>"
} }
``` ```
Each key under `action` defines an action that can be taken for the source. A source must at least have a `fetch` action. If an action named `on_create` is defined for the source, it is executed once for an item when that item is created, that is, the first time the item is returned from the source. Each key under `action` defines an action that can be taken for the source. An action must contain `exe` and may contain `args`. A source must have a `fetch` action.
Each key under `env` defines an environment variable that will be set when `fetch` or other actions are executed. Each key under `env` defines an environment variable that will be set when actions are executed.
If `cron` is present, it must define a crontab schedule. Intake will automatically create crontab entries to update each source according to its cron schedule. If `cron` is present, it must define a crontab schedule. Intake will automatically create crontab entries to update each source according to its cron schedule.
`batch` may be a number or string of a number. If it is present, items created by the source will be batched via `tts` so that all items created in a single 24-hour window become visible at the same time. Items created with a longer `tts` will keep their `tts`.
The batch window is computed from midnight to midnight UTC, offset by the value of `batch` (in seconds).
## Interface for source programs ## Interface for source programs
Intake interacts with sources by executing the actions defined in the source's `intake.json`. The `fetch` action is required and used to check for new feed items when `intake update` is executed. Intake interacts with sources by executing the actions defined in the source's `intake.json`. The `fetch` action is required and used to check for new feed items when `intake update` is executed.
To execute an action, intake executes the command given by `args`. The process's working directory is set to the source's folder, i.e. the folder containing `intake.json`. The process's environment is as follows: To execute an action, intake executes the `exe` program for the action with the corresponding `args` (if present) as arguments. The process's working directory is set to the source's folder, i.e. the folder containing `intake.json`. The process's environment is as follows:
* intake's environment is inherited. * intake's environment is inherited.
* `STATE_PATH` is set to the absolute path of `state`. * `STATE_PATH` is set to the absolute path of `state`.

View File

@ -1,9 +1,10 @@
(import ( (import
let (
lock = builtins.fromJSON (builtins.readFile ./flake.lock); let lock = builtins.fromJSON (builtins.readFile ./flake.lock); in
in
fetchTarball { fetchTarball {
url = "https://github.com/edolstra/flake-compat/archive/${lock.nodes.flake-compat.locked.rev}.tar.gz"; url = "https://github.com/edolstra/flake-compat/archive/${lock.nodes.flake-compat.locked.rev}.tar.gz";
sha256 = lock.nodes.flake-compat.locked.narHash; sha256 = lock.nodes.flake-compat.locked.narHash;
} }
) { src = ./.; }).defaultNix )
{ src = ./.; }
).defaultNix

View File

@ -26,13 +26,11 @@
}; };
# Expose the vm's intake revproxy at host port 5234 # Expose the vm's intake revproxy at host port 5234
virtualisation.forwardPorts = [ virtualisation.forwardPorts = [{
{
from = "host"; from = "host";
host.port = 5234; host.port = 5234;
guest.port = 8080; guest.port = 8080;
} }];
];
# Mount the demo content for both users # Mount the demo content for both users
nixos-shell.mounts = { nixos-shell.mounts = {

View File

@ -13,48 +13,28 @@
nixos-shell.inputs.nixpkgs.follows = "nixpkgs"; nixos-shell.inputs.nixpkgs.follows = "nixpkgs";
}; };
outputs = outputs = { self, nixpkgs, flake-compat, nixos-shell }:
{
self,
nixpkgs,
flake-compat,
nixos-shell,
}:
let let
inherit (nixpkgs.lib) makeOverridable nixosSystem; inherit (nixpkgs.lib) makeOverridable nixosSystem;
system = "x86_64-linux"; system = "x86_64-linux";
pkgs = nixpkgs.legacyPackages.${system}; in {
pythonEnv = pkgs.python3.withPackages ( packages.${system} = let
pypkgs: with pypkgs; [ pkgs = (import nixpkgs {
flask
black
pytest
]
);
in
{
formatter.${system} = nixpkgs.legacyPackages.${system}.nixfmt-rfc-style;
packages.${system} =
let
pkgs = (
import nixpkgs {
inherit system; inherit system;
overlays = [ self.overlays.default ]; overlays = [ self.overlays.default ];
} });
); in {
in
{
default = self.packages.${system}.intake; default = self.packages.${system}.intake;
inherit (pkgs) intake; inherit (pkgs) intake;
inherit pythonEnv;
}; };
devShells.${system} = { devShells.${system} = {
default = pkgs.mkShell { default = let
pkgs = nixpkgs.legacyPackages.${system};
pythonEnv = pkgs.python3.withPackages (pypkgs: with pypkgs; [ flask black pytest ]);
in pkgs.mkShell {
packages = [ packages = [
pythonEnv pythonEnv
pkgs.gnumake
pkgs.nixos-shell pkgs.nixos-shell
# We only take this dependency for htpasswd, which is a little unfortunate # We only take this dependency for htpasswd, which is a little unfortunate
pkgs.apacheHttpd pkgs.apacheHttpd
@ -68,23 +48,14 @@
overlays.default = final: prev: { overlays.default = final: prev: {
intake = final.python3Packages.buildPythonPackage { intake = final.python3Packages.buildPythonPackage {
name = "intake"; name = "intake";
src = builtins.path { src = builtins.path { path = ./.; name = "intake"; };
path = ./.;
name = "intake";
};
format = "pyproject"; format = "pyproject";
propagatedBuildInputs = with final.python3Packages; [ propagatedBuildInputs = with final.python3Packages; [ flask setuptools ];
flask
setuptools
];
}; };
}; };
templates.source = { templates.source = {
path = builtins.path { path = builtins.path { path = ./template; name = "source"; };
path = ./template;
name = "source";
};
description = "A basic intake source config"; description = "A basic intake source config";
}; };

View File

@ -18,16 +18,9 @@ from flask import (
current_app, current_app,
) )
from intake.core import intake_data_dir
from intake.crontab import update_crontab_entries from intake.crontab import update_crontab_entries
from intake.source import ( from intake.source import LocalSource, execute_action, Item
LocalSource,
execute_action,
Item,
fetch_items,
update_items,
intake_data_dir,
)
from intake.types import InvalidConfigException, SourceUpdateException
# Globals # Globals
app = Flask(__name__) app = Flask(__name__)
@ -122,23 +115,10 @@ def root():
if channels_config_path.exists(): if channels_config_path.exists():
channels = json.loads(channels_config_path.read_text(encoding="utf8")) channels = json.loads(channels_config_path.read_text(encoding="utf8"))
channel_counts = {
channel: len(
[
item
for source in sources
for item in LocalSource(data_path, source).get_all_items()
if not item.is_hidden
]
)
for channel, sources in channels.items()
}
return render_template( return render_template(
"home.jinja2", "home.jinja2",
sources=sources, sources=sources,
channels=channels, channels=channels,
channel_counts=channel_counts,
) )
@ -326,15 +306,13 @@ def _parse_source_config(config_str: str):
if "fetch" not in action: if "fetch" not in action:
return ("No fetch action defined", {}) return ("No fetch action defined", {})
fetch = action["fetch"] fetch = action["fetch"]
if "exe" not in fetch and not fetch.get("args"): if "exe" not in fetch:
return ("No fetch exe", {}) return ("No fetch exe", {})
config = {"action": parsed["action"]} config = {"action": parsed["action"]}
if "env" in parsed: if "env" in parsed:
config["env"] = parsed["env"] config["env"] = parsed["env"]
if "cron" in parsed: if "cron" in parsed:
config["cron"] = parsed["cron"] config["cron"] = parsed["cron"]
if "batch" in parsed:
config["batch"] = parsed["batch"]
return (None, config) return (None, config)
@ -390,24 +368,6 @@ def _parse_channels_config(config_str: str):
return (None, parsed) return (None, parsed)
@app.post("/fetch/<string:source_name>")
@auth_check
def fetch(source_name: str):
data_path: Path = current_app.config["INTAKE_DATA"]
source = LocalSource(data_path, source_name)
try:
items = fetch_items(source)
update_items(source, items)
titles = "\n".join(f"<li>{item.display_title}</li>" for item in items)
source_url = url_for("source_feed", name=source_name)
return f'Update returned {len(items)} items:<ul>{titles}</ul><p><a href="{source_url}">{source_name}</a></p>'
except InvalidConfigException as ex:
abort(500, f"Could not fetch {source_name}:\n{ex}")
except SourceUpdateException as ex:
abort(500, f"Error updating source {source_name}:\n{ex}")
@app.post("/add") @app.post("/add")
@auth_check @auth_check
def add_item(): def add_item():
@ -419,7 +379,7 @@ def add_item():
config_path = source_path / "intake.json" config_path = source_path / "intake.json"
if not config_path.exists(): if not config_path.exists():
config_path.write_text( config_path.write_text(
json.dumps({"action": {"fetch": {"args": ["true"]}}}, indent=2) json.dumps({"action": {"fetch": {"exe": "true"}}}, indent=2)
) )
source = LocalSource(source_path.parent, source_path.name) source = LocalSource(source_path.parent, source_path.name)

View File

@ -10,14 +10,9 @@ import pwd
import subprocess import subprocess
import sys import sys
from intake.core import intake_data_dir
from intake.crontab import update_crontab_entries from intake.crontab import update_crontab_entries
from intake.source import ( from intake.source import fetch_items, LocalSource, update_items, execute_action
fetch_items,
LocalSource,
update_items,
execute_action,
intake_data_dir,
)
from intake.types import InvalidConfigException, SourceUpdateException from intake.types import InvalidConfigException, SourceUpdateException
@ -57,6 +52,7 @@ def cmd_edit(cmd_args):
{ {
"action": { "action": {
"fetch": { "fetch": {
"exe": "",
"args": [], "args": [],
}, },
}, },

12
intake/core.py Normal file
View File

@ -0,0 +1,12 @@
from pathlib import Path
import os
def intake_data_dir() -> Path:
if intake_data := os.environ.get("INTAKE_DATA"):
return Path(intake_data)
if xdg_data_home := os.environ.get("XDG_DATA_HOME"):
return Path(xdg_data_home) / "intake"
if home := os.environ.get("HOME"):
return Path(home) / ".local" / "share" / "intake"
raise Exception("No intake data directory defined")

View File

@ -1,6 +1,5 @@
from pathlib import Path from pathlib import Path
import os import os
import shutil
import subprocess import subprocess
import sys import sys
@ -29,13 +28,15 @@ def update_crontab_entries(data_path: Path):
Update the intake-managed section of the user's crontab. Update the intake-managed section of the user's crontab.
""" """
# If there is no crontab command available, quit early. # If there is no crontab command available, quit early.
crontab = shutil.which("crontab") cmd = ("command", "-v", "crontab")
if not crontab: print("Executing", *cmd, file=sys.stderr)
print("No crontab found, skipping", file=sys.stderr) crontab_exists = subprocess.run(cmd, shell=True)
if crontab_exists.returncode:
print("Could not update crontab", file=sys.stderr)
return return
# Get the current crontab # Get the current crontab
cmd = [crontab, "-e"] cmd = ["crontab", "-e"]
print("Executing", *cmd, file=sys.stderr) print("Executing", *cmd, file=sys.stderr)
get_crontab = subprocess.run( get_crontab = subprocess.run(
cmd, cmd,
@ -76,7 +77,7 @@ def update_crontab_entries(data_path: Path):
print("Updating", len(new_crontab_lines) - 2, "crontab entries", file=sys.stderr) print("Updating", len(new_crontab_lines) - 2, "crontab entries", file=sys.stderr)
# Save the updated crontab # Save the updated crontab
cmd = [crontab, "-"] cmd = ["crontab", "-"]
print("Executing", *cmd, file=sys.stderr) print("Executing", *cmd, file=sys.stderr)
new_crontab: bytes = "\n".join(new_crontab_lines).encode("utf8") new_crontab: bytes = "\n".join(new_crontab_lines).encode("utf8")
save_crontab = subprocess.Popen( save_crontab = subprocess.Popen(

View File

@ -3,7 +3,7 @@ from pathlib import Path
from subprocess import Popen, PIPE, TimeoutExpired from subprocess import Popen, PIPE, TimeoutExpired
from threading import Thread from threading import Thread
from time import time as current_time from time import time as current_time
from typing import Generator from typing import List
import json import json
import os import os
import os.path import os.path
@ -12,16 +12,6 @@ import sys
from intake.types import InvalidConfigException, SourceUpdateException from intake.types import InvalidConfigException, SourceUpdateException
def intake_data_dir() -> Path:
if intake_data := os.environ.get("INTAKE_DATA"):
return Path(intake_data)
if xdg_data_home := os.environ.get("XDG_DATA_HOME"):
return Path(xdg_data_home) / "intake"
if home := os.environ.get("HOME"):
return Path(home) / ".local" / "share" / "intake"
raise Exception("No intake data directory defined")
class Item: class Item:
""" """
A wrapper for an item object. A wrapper for an item object.
@ -81,31 +71,26 @@ class Item:
# The time-to-live fields protects an item from removal until expiry. # The time-to-live fields protects an item from removal until expiry.
# This is mainly used to avoid old items resurfacing when their source # This is mainly used to avoid old items resurfacing when their source
# cannot guarantee monotonocity. # cannot guarantee monotonocity.
if "ttl" in self._item and self.ttl_at > current_time(): if "ttl" in self._item:
ttl_date = self._item["created"] + self._item["ttl"]
if ttl_date > current_time():
return False return False
# The time-to-die field puts a maximum lifespan on an item, removing it # The time-to-die field puts a maximum lifespan on an item, removing it
# even if it is active. # even if it is active.
if "ttd" in self._item and self.ttd_at < current_time(): if "ttd" in self._item:
ttd_date = self._item["created"] + self._item["ttd"]
if ttd_date < current_time():
return True return True
return not self._item["active"] return not self._item["active"]
@property
def tts_at(self):
return self._item["created"] + self._item.get("tts", 0)
@property
def ttl_at(self):
return self._item["created"] + self._item.get("ttl", 0)
@property
def ttd_at(self):
return self._item["created"] + self._item.get("ttd", 0)
@property @property
def before_tts(self): def before_tts(self):
return "tts" in self._item and current_time() < self.tts_at return (
"tts" in self._item
and current_time() < self._item["created"] + self._item["tts"]
)
@property @property
def is_hidden(self): def is_hidden(self):
@ -173,7 +158,7 @@ class LocalSource:
def get_item_path(self, item_id: dict) -> Path: def get_item_path(self, item_id: dict) -> Path:
return self.source_path / f"{item_id}.item" return self.source_path / f"{item_id}.item"
def get_item_ids(self) -> list[str]: def get_item_ids(self) -> List[str]:
return [ return [
filepath.name[:-5] filepath.name[:-5]
for filepath in self.source_path.iterdir() for filepath in self.source_path.iterdir()
@ -198,7 +183,7 @@ class LocalSource:
def delete_item(self, item_id) -> None: def delete_item(self, item_id) -> None:
os.remove(self.get_item_path(item_id)) os.remove(self.get_item_path(item_id))
def get_all_items(self) -> Generator[Item, None, None]: def get_all_items(self) -> List[Item]:
for filepath in self.source_path.iterdir(): for filepath in self.source_path.iterdir():
if filepath.name.endswith(".item"): if filepath.name.endswith(".item"):
yield Item(self, json.loads(filepath.read_text(encoding="utf8"))) yield Item(self, json.loads(filepath.read_text(encoding="utf8")))
@ -233,7 +218,7 @@ def _read_stderr(process: Popen) -> None:
def _execute_source_action( def _execute_source_action(
source: LocalSource, action: str, input: str, timeout: timedelta source: LocalSource, action: str, input: str, timeout: timedelta
) -> list[str]: ) -> List[str]:
""" """
Execute the action from a given source. If stdin is specified, pass it Execute the action from a given source. If stdin is specified, pass it
along to the process. Returns lines from stdout. along to the process. Returns lines from stdout.
@ -244,13 +229,10 @@ def _execute_source_action(
if not action_cfg: if not action_cfg:
raise InvalidConfigException(f"No such action {action}") raise InvalidConfigException(f"No such action {action}")
if "exe" not in action_cfg:
exe = [action_cfg["exe"]] if "exe" in action_cfg else []
command = exe + action_cfg.get("args", [])
if not command:
raise InvalidConfigException(f"No exe for action {action}") raise InvalidConfigException(f"No exe for action {action}")
command = [action_cfg["exe"], *action_cfg.get("args", [])]
config_env = {key: str(value) for key, value in config.get("env", {}).items()} config_env = {key: str(value) for key, value in config.get("env", {}).items()}
env = { env = {
**os.environ.copy(), **os.environ.copy(),
@ -301,13 +283,13 @@ def _execute_source_action(
return output return output
def fetch_items(source: LocalSource, timeout: int = 60) -> list[Item]: def fetch_items(source: LocalSource, timeout: int = 60) -> List[Item]:
""" """
Execute the feed source and return the current feed items. Execute the feed source and return the current feed items.
Returns a list of feed items on success. Returns a list of feed items on success.
Throws SourceUpdateException if the feed source update failed. Throws SourceUpdateException if the feed source update failed.
""" """
items: list[Item] = [] items: List[Item] = []
output = _execute_source_action(source, "fetch", None, timedelta(timeout)) output = _execute_source_action(source, "fetch", None, timedelta(timeout))
@ -343,54 +325,29 @@ def execute_action(
raise SourceUpdateException("invalid json") raise SourceUpdateException("invalid json")
def update_items(source: LocalSource, fetched_items: list[Item]): def update_items(source: LocalSource, fetched_items: List[Item]):
""" """
Update the source with a batch of new items, doing creations, updates, and Update the source with a batch of new items, doing creations, updates, and
deletions as necessary. deletions as necessary.
""" """
config = source.get_config()
# Get a list of item ids that already existed for this source. # Get a list of item ids that already existed for this source.
prior_ids = source.get_item_ids() prior_ids = source.get_item_ids()
print(f"Found {len(prior_ids)} prior items", file=sys.stderr) print(f"Found {len(prior_ids)} prior items", file=sys.stderr)
# Determine which items are new and which are updates. # Determine which items are new and which are updates.
new_items: list[Item] = [] new_items: List[Item] = []
upd_items: list[Item] = [] upd_items: List[Item] = []
for item in fetched_items: for item in fetched_items:
if source.item_exists(item["id"]): if source.item_exists(item["id"]):
upd_items.append(item) upd_items.append(item)
else: else:
new_items.append(item) new_items.append(item)
# If the source is batched, set the tts on new items to at least the batch tts
if "batch" in config:
try:
batch_adj = int(config["batch"])
now = current_time() - batch_adj
batch_start = now - (now % 86400)
batch_end = batch_start + 86400 + batch_adj
for item in new_items:
min_tts = batch_end - item["created"]
item["tts"] = min(min_tts, item.get("tts", min_tts))
except:
pass
# Write all the new items to the source directory. # Write all the new items to the source directory.
for item in new_items: for item in new_items:
# TODO: support on-create trigger
source.save_item(item) source.save_item(item)
# If the source has an on-create trigger, run it for new items
if "on_create" in config.get("action", {}):
for item in new_items:
try:
execute_action(source, item["id"], "on_create")
except Exception as ex:
print(
f"on_create failed for {source.source_name}/{item['id']}:\n{ex}",
file=sys.stderr,
)
# Update the other items using the fetched items' values. # Update the other items using the fetched items' values.
for upd_item in upd_items: for upd_item in upd_items:
old_item = source.get_item(upd_item["id"]) old_item = source.get_item(upd_item["id"])

View File

@ -174,13 +174,11 @@ var doAction = function (source, itemid, action) {
{# source/id/created footer line #} {# source/id/created footer line #}
{% if item.source or item.id or item.created %} {% if item.source or item.id or item.created %}
<span class="item-info" title="{% if item.tags %}{{ 'Tags: {}'.format(', '.join(item.tags)) }}{% else %}No tags{% endif %}"> <span class="item-info" title="{{ 'Tags: {}'.format(', '.join(item.tags)) }}">
{% if item.source %}{{item.source}}{% endif %} {% if item.source %}{{item.source}}{% endif %}
{% if item.id %}{{item.id}}{% endif %} {% if item.id %}{{item.id}}{% endif %}
{% if item.created %}{{item.created|datetimeformat}}{% endif %} {% if item.created %}{{item.created|datetimeformat}}{% endif %}
{% if item.ttl %}<span title="TTL {{item.ttl_at|datetimeformat}}">[L]</span>{% endif %} {% if item.ttl %}L{% endif %}{% if item.ttd %}D{% endif %}{% if item.tts %}S{% endif %}
{% if item.ttd %}<span title="TTD {{item.ttd_at|datetimeformat}}">[D]</span>{% endif %}
{% if item.tts %}<span title="TTS {{item.tts_at|datetimeformat}}">[S]</span>{% endif %}
</span> </span>
{% endif %} {% endif %}

View File

@ -34,12 +34,6 @@ summary:focus {
width: 100%; width: 100%;
resize: vertical; resize: vertical;
} }
.intake-sources td {
padding-block: 0.4em;
}
.intake-sources form {
margin: 0
}
</style> </style>
</head> </head>
<body> <body>
@ -52,13 +46,7 @@ summary:focus {
<p>No channels found.</p> <p>No channels found.</p>
{% else %} {% else %}
{% for channel in channels %} {% for channel in channels %}
<p><a href="{{ url_for('channel_feed', name=channel) }}"> <p><a href="{{ url_for('channel_feed', name=channel) }}">{{ channel }}</a></p>
{%- if channel_counts[channel] -%}
({{ channel_counts[channel] }}) {{ channel }}
{%- else -%}
{{ channel }}
{%- endif -%}
</a></p>
{% endfor %} {% endfor %}
{% endif %} {% endif %}
<p><a href="{{ url_for('channels_edit') }}">Edit channels</a></p> <p><a href="{{ url_for('channels_edit') }}">Edit channels</a></p>
@ -71,22 +59,17 @@ summary:focus {
{% if not sources %} {% if not sources %}
<p>No sources found.</p> <p>No sources found.</p>
{% else %} {% else %}
<table class="intake-sources">
{% for source in sources %} {% for source in sources %}
<tr> <p>
<td>
{%- for channel, srcs in channels|items -%} {%- for channel, srcs in channels|items -%}
{%- if source.source_name in srcs -%} {%- if source.source_name in srcs -%}
^ ^
{%- endif -%} {%- endif -%}
{%- endfor -%} {%- endfor -%}
</td> <a href="{{ url_for('source_feed', name=source.source_name) }}">{{ source.source_name|safe }}</a>
<td><form id="{{ source.source_name }}"><button type="submit" formmethod=post formaction="{{ url_for('fetch', source_name=source.source_name) }}" />fetch</button></form></td> (<a href="{{ url_for('source_edit', name=source.source_name) }}">edit</a>)
<td>(<a href="{{ url_for('source_edit', name=source.source_name) }}">edit</a>)</td> </p>
<td><a href="{{ url_for('source_feed', name=source.source_name) }}">{{ source.source_name|safe }}</a></td>
</tr>
{% endfor %} {% endfor %}
</table>
{% endif %} {% endif %}
</details> </details>
</article> </article>

View File

@ -1,39 +1,23 @@
flake: flake: { config, lib, pkgs, ... }:
{
config,
lib,
pkgs,
...
}:
let let
inherit (lib) inherit (lib) filterAttrs foldl imap1 mapAttrsToList mkEnableOption mkIf mkMerge mkOption mkPackageOption types;
filterAttrs
foldl
imap1
mapAttrsToList
mkEnableOption
mkIf
mkMerge
mkOption
mkPackageOption
types
;
intakeCfg = config.services.intake; intakeCfg = config.services.intake;
in in {
{
options = { options = {
services.intake = { services.intake = {
listen.addr = mkOption { listen.addr = mkOption {
type = types.str; type = types.str;
default = "0.0.0.0"; default = "0.0.0.0";
description = "The listen address for the entry point to intake services. This endpoint will redirect to a local port based on the request's HTTP Basic Auth credentials."; description = "The listen address for the entry point to intake services. This endpoint will redirect to a "
"local port based on the request's HTTP Basic Auth credentials.";
}; };
listen.port = mkOption { listen.port = mkOption {
type = types.port; type = types.port;
default = 80; default = 80;
description = "The listen port for the entry point to intake services. This endpoint will redirect to a local port based on the request's HTTP Basic Auth credentials."; description = "The listen port for the entry point to intake services. This endpoint will redirect to a local "
"port based on the request's HTTP Basic Auth credentials.";
}; };
package = mkPackageOption pkgs "intake" {}; package = mkPackageOption pkgs "intake" {};
@ -41,7 +25,8 @@ in
internalPortStart = mkOption { internalPortStart = mkOption {
type = types.port; type = types.port;
default = 24130; default = 24130;
description = "The first port to use for internal service endpoints. A number of ports will be continguously allocated equal to the number of users with enabled intake services."; description = "The first port to use for internal service endpoints. A number of ports will be continguously "
"allocated equal to the number of users with enabled intake services.";
}; };
extraPackages = mkOption { extraPackages = mkOption {
@ -53,8 +38,7 @@ in
users = mkOption { users = mkOption {
description = "User intake service definitions."; description = "User intake service definitions.";
default = {}; default = {};
type = types.attrsOf ( type = types.attrsOf (types.submodule {
types.submodule {
options = { options = {
enable = mkEnableOption "intake, a personal feed aggregator."; enable = mkEnableOption "intake, a personal feed aggregator.";
@ -64,8 +48,7 @@ in
description = "Extra packages available to this user and their intake service."; description = "Extra packages available to this user and their intake service.";
}; };
}; };
} });
);
}; };
}; };
}; };
@ -79,9 +62,7 @@ in
# Assign each user an internal port for their personal intake instance # Assign each user an internal port for their personal intake instance
enabledUsers = filterAttrs (userName: userCfg: userCfg.enable) intakeCfg.users; enabledUsers = filterAttrs (userName: userCfg: userCfg.enable) intakeCfg.users;
enabledUserNames = mapAttrsToList (userName: userCfg: userName) enabledUsers; enabledUserNames = mapAttrsToList (userName: userCfg: userName) enabledUsers;
userPortList = imap1 (i: userName: { userPortList = imap1 (i: userName: { ${userName} = i + intakeCfg.internalPortStart; }) enabledUserNames;
${userName} = i + intakeCfg.internalPortStart;
}) enabledUserNames;
userPort = foldl (acc: val: acc // val) {} userPortList; userPort = foldl (acc: val: acc // val) {} userPortList;
# To avoid polluting PATH with httpd programs, define an htpasswd wrapper # To avoid polluting PATH with httpd programs, define an htpasswd wrapper
@ -92,8 +73,7 @@ in
# File locations # File locations
intakeDir = "/etc/intake"; intakeDir = "/etc/intake";
intakePwd = "${intakeDir}/htpasswd"; intakePwd = "${intakeDir}/htpasswd";
in in {
{
# Apply the overlay so intake is included in pkgs. # Apply the overlay so intake is included in pkgs.
nixpkgs.overlays = [ flake.overlays.default ]; nixpkgs.overlays = [ flake.overlays.default ];
@ -118,13 +98,12 @@ in
users.users = users.users =
let let
addPackagesToUser = userName: { addPackagesToUser = userName: {
${userName}.packages = [ ${userName}.packages =
htpasswdWrapper [ htpasswdWrapper intake ]
intake ++ intakeCfg.extraPackages
] ++ intakeCfg.extraPackages ++ intakeCfg.users.${userName}.extraPackages; ++ intakeCfg.users.${userName}.extraPackages;
}; };
in in mkMerge (map addPackagesToUser enabledUserNames);
mkMerge (map addPackagesToUser enabledUserNames);
# Enable cron # Enable cron
services.cron.enable = true; services.cron.enable = true;
@ -132,9 +111,7 @@ in
# Define a user service for each configured user # Define a user service for each configured user
systemd.services = systemd.services =
let let
runScript = runScript = userName: pkgs.writeShellScript "intake-run.sh" ''
userName:
pkgs.writeShellScript "intake-run.sh" ''
# Add the setuid wrapper directory so `crontab` is accessible # Add the setuid wrapper directory so `crontab` is accessible
export PATH="${config.security.wrapperDir}:$PATH" export PATH="${config.security.wrapperDir}:$PATH"
${pythonEnv}/bin/intake run -d /home/${userName}/.local/share/intake --port ${toString userPort.${userName}} ${pythonEnv}/bin/intake run -d /home/${userName}/.local/share/intake --port ${toString userPort.${userName}}
@ -154,8 +131,7 @@ in
enable = userCfg.enable; enable = userCfg.enable;
}; };
}; };
in in mkMerge (mapAttrsToList userServiceConfig intakeCfg.users);
mkMerge (mapAttrsToList userServiceConfig intakeCfg.users);
# Define an nginx reverse proxy to request auth # Define an nginx reverse proxy to request auth
services.nginx = mkIf (enabledUsers != {}) { services.nginx = mkIf (enabledUsers != {}) {
@ -166,13 +142,11 @@ in
proxyPass = "http://127.0.0.1:$target_port"; proxyPass = "http://127.0.0.1:$target_port";
basicAuthFile = intakePwd; basicAuthFile = intakePwd;
}; };
extraConfig = foldl (acc: val: acc + val) "" ( extraConfig = foldl (acc: val: acc + val) "" (mapAttrsToList (userName: port: ''
mapAttrsToList (userName: port: ''
if ($remote_user = "${userName}") { if ($remote_user = "${userName}") {
set $target_port ${toString port}; set $target_port ${toString port};
} }
'') userPort '') userPort);
);
}; };
}; };
}; };

View File

@ -1,6 +1,6 @@
[project] [project]
name = "intake" name = "intake"
version = "1.3.1" version = "1.1.0"
[project.scripts] [project.scripts]
intake = "intake.cli:main" intake = "intake.cli:main"

View File

@ -1,9 +1,10 @@
(import ( (import
let (
lock = builtins.fromJSON (builtins.readFile ./flake.lock); let lock = builtins.fromJSON (builtins.readFile ./flake.lock); in
in
fetchTarball { fetchTarball {
url = "https://github.com/edolstra/flake-compat/archive/${lock.nodes.flake-compat.locked.rev}.tar.gz"; url = "https://github.com/edolstra/flake-compat/archive/${lock.nodes.flake-compat.locked.rev}.tar.gz";
sha256 = lock.nodes.flake-compat.locked.narHash; sha256 = lock.nodes.flake-compat.locked.narHash;
} }
) { src = ./.; }).shellNix )
{ src = ./.; }
).shellNix

View File

@ -2,7 +2,6 @@
"demo": [ "demo": [
"demo_basic_callback", "demo_basic_callback",
"demo_logging", "demo_logging",
"demo_raw_sh", "demo_raw_sh"
"demo_oncreate"
] ]
} }

View File

@ -1,5 +1,5 @@
from pathlib import Path from pathlib import Path
from typing import Generator, Callable from typing import List, Callable
import pytest import pytest
@ -14,16 +14,15 @@ def clean_source(source_path: Path):
@pytest.fixture @pytest.fixture
def using_source() -> Generator[Callable, None, LocalSource]: def using_source() -> Callable:
test_data = Path(__file__).parent test_data = Path(__file__).parent
sources: list[Path] = [] sources: List[Path] = []
def _using_source(name: str): def _using_source(name: str):
source_path = test_data / name source_path = test_data / name
clean_source(source_path) clean_source(source_path)
sources.append(source_path) sources.append(source_path)
return LocalSource(test_data, name) return LocalSource(test_data, name)
yield _using_source yield _using_source
for source_path in sources: for source_path in sources:

View File

@ -1,9 +1,7 @@
{ {
"action": { "action": {
"fetch": { "fetch": {
"args": [ "exe": "true"
"true"
]
} }
} }
} }

View File

@ -1,20 +1,20 @@
{ {
"action": { "action": {
"fetch": { "fetch": {
"exe": "./increment.py",
"args": [ "args": [
"./increment.py",
"fetch" "fetch"
] ]
}, },
"increment": { "increment": {
"exe": "./increment.py",
"args": [ "args": [
"./increment.py",
"increment" "increment"
] ]
}, },
"decrement": { "decrement": {
"exe": "./increment.py",
"args": [ "args": [
"./increment.py",
"decrement" "decrement"
] ]
} }

View File

@ -1,8 +1,8 @@
{ {
"action": { "action": {
"fetch": { "fetch": {
"exe": "python3",
"args": [ "args": [
"python3",
"update.py" "update.py"
] ]
} }

View File

@ -1,22 +0,0 @@
{
"action": {
"fetch": {
"args": [
"./update.py",
"fetch"
]
},
"update": {
"args": [
"./update.py",
"update"
]
},
"on_create": {
"args": [
"./update.py",
"update"
]
}
}
}

View File

@ -1,29 +0,0 @@
#!/usr/bin/env python3
import argparse, json, sys, time
parser = argparse.ArgumentParser()
parser.add_argument("action")
args = parser.parse_args()
print("args:", args, file=sys.stderr, flush=True)
if args.action == "fetch":
print(
json.dumps(
{
"id": str(int(time.time())),
"title": "Title has not been updated",
"action": {
"update": 0,
},
}
)
)
if args.action == "update" or args.action == "on_create":
item = sys.stdin.readline()
item = json.loads(item)
item["action"]["update"] += 1
item["title"] = f"Updated {item['action']['update']} times"
print(json.dumps(item))

View File

@ -1,8 +1,8 @@
{ {
"action": { "action": {
"fetch": { "fetch": {
"exe": "sh",
"args": [ "args": [
"sh",
"-c", "-c",
"echo {\\\"id\\\": \\\"$(date +%Y-%m-%d-%H-%M)\\\"}" "echo {\\\"id\\\": \\\"$(date +%Y-%m-%d-%H-%M)\\\"}"
] ]

View File

@ -1,10 +1,8 @@
{ {
"action": { "action": {
"fetch": { "fetch": {
"args": [ "exe": "./update.py",
"./update.py", "args": ["fetch"]
"fetch"
]
} }
} }
} }

View File

@ -1,6 +1,4 @@
import json import json
from pathlib import Path
import tempfile
from intake.source import fetch_items, update_items, LocalSource from intake.source import fetch_items, update_items, LocalSource
@ -10,7 +8,6 @@ def test_default_source(using_source):
fetch = fetch_items(source) fetch = fetch_items(source)
assert len(fetch) == 0 assert len(fetch) == 0
def test_basic_lifecycle(using_source): def test_basic_lifecycle(using_source):
source: LocalSource = using_source("test_inbox") source: LocalSource = using_source("test_inbox")
state = {"inbox": [{"id": "first"}]} state = {"inbox": [{"id": "first"}]}
@ -64,44 +61,3 @@ def test_basic_lifecycle(using_source):
items = list(source.get_all_items()) items = list(source.get_all_items())
assert len(items) == 1 assert len(items) == 1
assert items[0]["id"] == "second" assert items[0]["id"] == "second"
def test_batch():
with tempfile.TemporaryDirectory() as data_dir:
root = Path(data_dir)
source_dir = root / "batching"
source_dir.mkdir()
config_file = source_dir / "intake.json"
sh_args = [
"python",
"-c",
"import random; print(f'{{\"id\":\"{random.randrange(16**16):016x}\"}}')"
]
batch_config = {
"action": {
"fetch": {
"args": sh_args
}
},
"batch": 0
}
config_file.write_text(json.dumps(batch_config))
source = LocalSource(root, source_dir.name)
# batch sets the tts
fetch1 = fetch_items(source)
assert len(fetch1) == 1
update_items(source, fetch1)
item1 = source.get_item(fetch1[0]["id"])
assert "tts" in item1._item
batch_config["batch"] = 3600
config_file.write_text(json.dumps(batch_config))
fetch2 = fetch_items(source)
assert len(fetch2) == 1
update_items(source, fetch2)
item2 = source.get_item(fetch2[0]["id"])
assert "tts" in item2._item
assert item1["id"] != item2["id"]
assert item2.tts_at == item1.tts_at + 3600