Add update logic

This commit is contained in:
Tim Van Baak 2023-05-29 16:19:29 -07:00
parent 7afd5e1484
commit fc583186dd
3 changed files with 153 additions and 17 deletions

View File

@ -1 +0,0 @@
HELLO = "WORLD"

View File

@ -4,7 +4,7 @@ import os
import os.path import os.path
import sys import sys
from .source import fetch_items from .source import fetch_items, LocalSource, update_items
from .types import InvalidConfigException, SourceUpdateException from .types import InvalidConfigException, SourceUpdateException
@ -15,11 +15,11 @@ def intake_data_dir() -> Path:
return intake_data return intake_data
def cmd_fetch(cmd_args): def cmd_update(cmd_args):
"""Execute the fetch for a source.""" """Fetch items for a source and update it."""
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog="intake fetch", prog="intake update",
description=cmd_fetch.__doc__, description=cmd_update.__doc__,
) )
parser.add_argument( parser.add_argument(
"--base", "--base",
@ -30,12 +30,20 @@ def cmd_fetch(cmd_args):
"--source", "--source",
help="Source name to fetch", help="Source name to fetch",
) )
parser.add_argument(
"--dry-run",
action="store_true",
help="Instead of updating the source, print the fetched items"
)
args = parser.parse_args(cmd_args) args = parser.parse_args(cmd_args)
ret = 0 ret = 0
source_path = Path(args.base) / args.source source = LocalSource(Path(args.base), args.source)
try: try:
items = fetch_items(source_path) items = fetch_items(source)
if not args.dry_run:
update_items(source, items)
else:
for item in items: for item in items:
print("Item:", item) print("Item:", item)
except InvalidConfigException as ex: except InvalidConfigException as ex:

View File

@ -1,13 +1,72 @@
from pathlib import Path from pathlib import Path
from subprocess import Popen, PIPE, TimeoutExpired from subprocess import Popen, PIPE, TimeoutExpired
from threading import Thread from threading import Thread
from typing import List
import json import json
import os import os
import os.path import os.path
import time
from .types import InvalidConfigException, SourceUpdateException from .types import InvalidConfigException, SourceUpdateException
class LocalSource:
"""
An intake source backed by a filesystem directory.
"""
def __init__(self, data_path: Path, source_name: str):
self.data_path: Path = data_path
self.source_name = source_name
self.source_path: Path = data_path / source_name
def get_config(self) -> dict:
config_path = self.source_path / "intake.json"
with open(config_path, "r", encoding="utf8") as config_file:
return json.load(config_file)
def get_state_path(self) -> Path:
return (self.source_path / "state").absolute()
def get_item_path(self, item_id: dict) -> Path:
return self.source_path / f"{item_id}.item"
def get_item_ids(self) -> List[str]:
return [
filepath.name[:-5]
for filepath in self.source_path.iterdir()
if filepath.name.endswith(".item")]
def item_exists(self, item_id) -> bool:
return self.get_item_path(item_id).exists()
def new_item(self, item: dict) -> dict:
# Ensure required fields
if "id" not in item:
raise KeyError("id")
item["source"] = self.source_name
item["active"] = True
item["created"] = int(time.time())
item["title"] = item.get("title", item["id"])
item["tags"] = item.get("tags", [self.source_name])
# All other fields are optiona
self.save_item(item)
return item
def get_item(self, item_id: str) -> dict:
with self.get_item_path(item_id).open() as f:
return json.load(f)
def save_item(self, item: dict) -> None:
# Write to a tempfile first to avoid losing the item on write failure
tmp_path = self.source_path / f"{item['id']}.item.tmp"
with tmp_path.open("w") as f:
f.write(json.dumps(item, indent=2))
os.rename(tmp_path, self.get_item_path(item['id']))
def delete_item(self, item_id) -> None:
os.remove(self.get_item_path(item_id))
def read_stdout(process: Popen, outs: list): def read_stdout(process: Popen, outs: list):
""" """
Read the subprocess's stdout into memory. Read the subprocess's stdout into memory.
@ -35,19 +94,17 @@ def read_stderr(process: Popen):
break break
def fetch_items(source_path: Path, update_timeout=60): def fetch_items(source: LocalSource, update_timeout=60):
""" """
Execute the feed source and return the current feed items. Execute the feed source and return the current feed items.
Returns a list of feed items on success. Returns a list of feed items on success.
Throws SourceUpdateException if the feed source update failed. Throws SourceUpdateException if the feed source update failed.
""" """
# Load the source's config to get its update command # Load the source's config to get its update command
config_path = source_path / "intake.json" config = source.get_config()
with open(config_path, "r", encoding="utf8") as config_file:
config = json.load(config_file)
if "fetch" not in config: if "fetch" not in config:
raise InvalidConfigException("Missing exe") raise InvalidConfigException("Missing fetch")
exe_name = config["fetch"]["exe"] exe_name = config["fetch"]["exe"]
exe_args = config["fetch"].get("args", []) exe_args = config["fetch"].get("args", [])
@ -56,7 +113,7 @@ def fetch_items(source_path: Path, update_timeout=60):
exe_env = { exe_env = {
**os.environ.copy(), **os.environ.copy(),
**config.get("env", {}), **config.get("env", {}),
"STATE_PATH": str((source_path / "state").absolute()), "STATE_PATH": str(source.get_state_path()),
} }
# Launch the update command # Launch the update command
@ -65,7 +122,7 @@ def fetch_items(source_path: Path, update_timeout=60):
[exe_name, *exe_args], [exe_name, *exe_args],
stdout=PIPE, stdout=PIPE,
stderr=PIPE, stderr=PIPE,
cwd=source_path, cwd=source.source_path,
env=exe_env, env=exe_env,
encoding="utf8", encoding="utf8",
) )
@ -100,3 +157,75 @@ def fetch_items(source_path: Path, update_timeout=60):
raise SourceUpdateException("invalid json") raise SourceUpdateException("invalid json")
return items return items
def update_items(source: LocalSource, fetched_items):
"""
Update the source with a batch of new items, doing creations, updates, and
deletions as necessary.
"""
# Get a list of item ids that already existed for this source.
prior_ids = source.get_item_ids()
print(f'Found {len(prior_ids)} prior items')
# Determine which items are new and which are updates.
new_items = []
upd_items = []
for item in fetched_items:
if source.item_exists(item["id"]):
upd_items.append(item)
else:
new_items.append(item)
# Write all the new items to the source directory.
for item in new_items:
# TODO: support on-create trigger
source.new_item(item)
# Update the other items using the fetched items' values.
for upd_item in upd_items:
old_item = source.get_item(upd_item["id"])
for field in ("title", "tags", "link", "time", "author", "body", "ttl", "ttd", "tts"):
if field in upd_item and old_item[field] != upd_item[field]:
old_item[field] = upd_item[field]
if "callback" in upd_item:
# Because of the way this update happens, any fields that are set
# in the callback when the item is new will keep their original
# values, as those values reappear in new_item on subsequent
# updates.
old_item['callback'] = {**old_item['callback'], **upd_item['callback']}
# Items are removed when they are old (not in the latest fetch) and
# inactive. Some item fields change this basic behavior.
del_count = 0
now = int(time.time())
upd_ids = [item["id"] for item in upd_items]
old_item_ids = [
item_id for item_id in prior_ids
if item_id not in upd_ids]
for item_id in old_item_ids:
item = source.get_item(item_id)
remove = not item["active"]
# The time-to-live field protects an item from removal until expiry.
# This is mainly used to avoid old items resurfacing when their source
# cannot guarantee monotonicity.
if "ttl" in item:
ttl_date = item["created"] + item["ttl"]
if ttl_date > now:
continue
# The time-to-die field puts a maximum lifespan on an item, removing it
# even if it is active.
if "ttd" in item:
ttd_date = item["created"] + item["ttd"]
if ttd_date < now:
remove = True
# Items to be removed are deleted.
if remove:
source.delete_item(item["id"])
del_count += 1
print(len(new_items), "new,", del_count, "deleted")