Add source-level item batching

This commit is contained in:
Tim Van Baak 2024-11-06 19:57:31 -08:00
parent d0780a9fd1
commit ec33495c56
6 changed files with 87 additions and 17 deletions

View File

@ -40,7 +40,8 @@ intake
"env": { "env": {
"...": "..." "...": "..."
}, },
"cron": "* * * * *" "cron": "* * * * *",
"batch": "<number>"
} }
``` ```
@ -50,6 +51,10 @@ Each key under `env` defines an environment variable that will be set when `fetc
If `cron` is present, it must define a crontab schedule. Intake will automatically create crontab entries to update each source according to its cron schedule. If `cron` is present, it must define a crontab schedule. Intake will automatically create crontab entries to update each source according to its cron schedule.
`batch` may be a number or string of a number. If it is present, items created by the source will be batched via `tts` so that all items created in a single 24-hour window become visible at the same time. Items created with a longer `tts` will keep their `tts`.
The batch window is computed from midnight to midnight UTC, offset by the value of `batch` (in seconds).
## Interface for source programs ## Interface for source programs
Intake interacts with sources by executing the actions defined in the source's `intake.json`. The `fetch` action is required and used to check for new feed items when `intake update` is executed. Intake interacts with sources by executing the actions defined in the source's `intake.json`. The `fetch` action is required and used to check for new feed items when `intake update` is executed.

View File

@ -333,6 +333,8 @@ def _parse_source_config(config_str: str):
config["env"] = parsed["env"] config["env"] = parsed["env"]
if "cron" in parsed: if "cron" in parsed:
config["cron"] = parsed["cron"] config["cron"] = parsed["cron"]
if "batch" in parsed:
config["batch"] = parsed["batch"]
return (None, config) return (None, config)

View File

@ -81,26 +81,31 @@ class Item:
# The time-to-live fields protects an item from removal until expiry. # The time-to-live fields protects an item from removal until expiry.
# This is mainly used to avoid old items resurfacing when their source # This is mainly used to avoid old items resurfacing when their source
# cannot guarantee monotonocity. # cannot guarantee monotonocity.
if "ttl" in self._item: if "ttl" in self._item and self.ttl_at > current_time():
ttl_date = self._item["created"] + self._item["ttl"]
if ttl_date > current_time():
return False return False
# The time-to-die field puts a maximum lifespan on an item, removing it # The time-to-die field puts a maximum lifespan on an item, removing it
# even if it is active. # even if it is active.
if "ttd" in self._item: if "ttd" in self._item and self.ttd_at < current_time():
ttd_date = self._item["created"] + self._item["ttd"]
if ttd_date < current_time():
return True return True
return not self._item["active"] return not self._item["active"]
@property
def tts_at(self):
return self._item["created"] + self._item.get("tts", 0)
@property
def ttl_at(self):
return self._item["created"] + self._item.get("ttl", 0)
@property
def ttd_at(self):
return self._item["created"] + self._item.get("ttd", 0)
@property @property
def before_tts(self): def before_tts(self):
return ( return "tts" in self._item and current_time() < self.tts_at
"tts" in self._item
and current_time() < self._item["created"] + self._item["tts"]
)
@property @property
def is_hidden(self): def is_hidden(self):
@ -358,6 +363,19 @@ def update_items(source: LocalSource, fetched_items: list[Item]):
else: else:
new_items.append(item) new_items.append(item)
# If the source is batched, set the tts on new items to at least the batch tts
if "batch" in config:
try:
batch_adj = int(config["batch"])
now = current_time() - batch_adj
batch_start = now - (now % 86400)
batch_end = batch_start + 86400 + batch_adj
for item in new_items:
min_tts = batch_end - item["created"]
item["tts"] = min(min_tts, item.get("tts", min_tts))
except:
pass
# Write all the new items to the source directory. # Write all the new items to the source directory.
for item in new_items: for item in new_items:
source.save_item(item) source.save_item(item)

View File

@ -178,7 +178,9 @@ var doAction = function (source, itemid, action) {
{% if item.source %}{{item.source}}{% endif %} {% if item.source %}{{item.source}}{% endif %}
{% if item.id %}{{item.id}}{% endif %} {% if item.id %}{{item.id}}{% endif %}
{% if item.created %}{{item.created|datetimeformat}}{% endif %} {% if item.created %}{{item.created|datetimeformat}}{% endif %}
{% if item.ttl %}L{% endif %}{% if item.ttd %}D{% endif %}{% if item.tts %}S{% endif %} {% if item.ttl %}<span title="TTL {{item.ttl_at|datetimeformat}}">[L]</span>{% endif %}
{% if item.ttd %}<span title="TTD {{item.ttd_at|datetimeformat}}">[D]</span>{% endif %}
{% if item.tts %}<span title="TTS {{item.tts_at|datetimeformat}}">[S]</span>{% endif %}
</span> </span>
{% endif %} {% endif %}

View File

@ -1,5 +1,5 @@
from pathlib import Path from pathlib import Path
from typing import List, Callable from typing import Generator, Callable
import pytest import pytest
@ -14,9 +14,9 @@ def clean_source(source_path: Path):
@pytest.fixture @pytest.fixture
def using_source() -> Callable: def using_source() -> Generator[Callable, None, LocalSource]:
test_data = Path(__file__).parent test_data = Path(__file__).parent
sources: List[Path] = [] sources: list[Path] = []
def _using_source(name: str): def _using_source(name: str):
source_path = test_data / name source_path = test_data / name

View File

@ -1,4 +1,6 @@
import json import json
from pathlib import Path
import tempfile
from intake.source import fetch_items, update_items, LocalSource from intake.source import fetch_items, update_items, LocalSource
@ -62,3 +64,44 @@ def test_basic_lifecycle(using_source):
items = list(source.get_all_items()) items = list(source.get_all_items())
assert len(items) == 1 assert len(items) == 1
assert items[0]["id"] == "second" assert items[0]["id"] == "second"
def test_batch():
with tempfile.TemporaryDirectory() as data_dir:
root = Path(data_dir)
source_dir = root / "batching"
source_dir.mkdir()
config_file = source_dir / "intake.json"
sh_args = [
"python",
"-c",
"import random; print(f'{{\"id\":\"{random.randrange(16**16):016x}\"}}')"
]
batch_config = {
"action": {
"fetch": {
"args": sh_args
}
},
"batch": 0
}
config_file.write_text(json.dumps(batch_config))
source = LocalSource(root, source_dir.name)
# batch sets the tts
fetch1 = fetch_items(source)
assert len(fetch1) == 1
update_items(source, fetch1)
item1 = source.get_item(fetch1[0]["id"])
assert "tts" in item1._item
batch_config["batch"] = 3600
config_file.write_text(json.dumps(batch_config))
fetch2 = fetch_items(source)
assert len(fetch2) == 1
update_items(source, fetch2)
item2 = source.get_item(fetch2[0]["id"])
assert "tts" in item2._item
assert item1["id"] != item2["id"]
assert item2.tts_at == item1.tts_at + 3600