diff --git a/intake/app.py b/intake/app.py index 08430c8..bee5688 100644 --- a/intake/app.py +++ b/intake/app.py @@ -8,7 +8,7 @@ import time from flask import Flask, render_template, request, jsonify, abort, redirect, url_for -from intake.source import LocalSource, execute_action +from intake.source import LocalSource, execute_action, Item # Globals app = Flask(__name__) @@ -24,18 +24,8 @@ def intake_data_dir() -> Path: raise Exception("No intake data directory defined") -def item_sort_key(item): - item_date = item.get("time", item.get("created", 0)) - return (item_date, item["id"]) - - -def show_item(item): - """ - Whether to show an item based on active and tts. - """ - return item["active"] and ( - "tts" not in item or item["created"] + item["tts"] < int(time.time()) - ) +def item_sort_key(item: Item): + return item.sort_key @app.template_filter("datetimeformat") @@ -80,9 +70,7 @@ def source_feed(name): if not source.source_path.exists(): abort(404) - return _sources_feed( - name, [source], show_hidden=request.args.get("hidden", True) - ) + return _sources_feed(name, [source], show_hidden=request.args.get("hidden", True)) @app.get("/channel/") @@ -98,9 +86,7 @@ def channel_feed(name): abort(404) sources = [LocalSource(intake_data_dir(), name) for name in channels[name]] - return _sources_feed( - name, sources, show_hidden=request.args.get("hidden", False) - ) + return _sources_feed(name, sources, show_hidden=request.args.get("hidden", False)) def _sources_feed(name: str, sources: List[LocalSource], show_hidden: bool): @@ -113,7 +99,7 @@ def _sources_feed(name: str, sources: List[LocalSource], show_hidden: bool): item for source in sources for item in source.get_all_items() - if show_item(item) or show_hidden + if not item.is_hidden or show_hidden ], key=item_sort_key, ) @@ -308,21 +294,17 @@ def add_item(): source_path.mkdir() config_path = source_path / "intake.json" if not config_path.exists(): - config_path.write_text(json.dumps({ - "action": { - "fetch": { - "exe": "true" - } - } - }, indent=2)) + config_path.write_text( + json.dumps({"action": {"fetch": {"exe": "true"}}}, indent=2) + ) source = LocalSource(source_path.parent, source_path.name) # Clean up the fields - item = {key: value for key, value in request.form.items() if value} - item["id"] = '{:x}'.format(getrandbits(16 * 4)) + fields = {key: value for key, value in request.form.items() if value} + fields["id"] = "{:x}".format(getrandbits(16 * 4)) # TODO: this doesn't support tags or ttX fields correctly - - source.new_item(item) + item = Item.create(source, **fields) + source.save_item(item) return redirect(url_for("source_feed", name="default")) diff --git a/intake/cli.py b/intake/cli.py index 1f5ed97..e5cf258 100644 --- a/intake/cli.py +++ b/intake/cli.py @@ -224,7 +224,10 @@ def cmd_feed(cmd_args): for name in args.sources if (data / name / "intake.json").exists() ] - items = [item for source in sources for item in source.get_all_items()] + items = sorted( + [item for source in sources for item in source.get_all_items()], + key=lambda item: item.sort_key, + ) if not items: print("Feed is empty") @@ -234,7 +237,7 @@ def cmd_feed(cmd_args): width = min(80, size.columns) for item in items: - title = item["title"] if "title" in item else "" + title = item.display_title titles = [title] while len(titles[-1]) > width - 4: i = titles[-1][: width - 4].rfind(" ") diff --git a/intake/source.py b/intake/source.py index 931a4b4..8341622 100755 --- a/intake/source.py +++ b/intake/source.py @@ -2,15 +2,137 @@ from datetime import timedelta from pathlib import Path from subprocess import Popen, PIPE, TimeoutExpired from threading import Thread +from time import time as current_time from typing import List import json import os import os.path -import time from intake.types import InvalidConfigException, SourceUpdateException +class Item: + """ + A wrapper for an item object. + """ + + def __init__(self, source: "LocalSource", item: dict): + self.source = source + self._item = item + + # Methods to allow Item as a drop-in replacement for the item dict itself + def __contains__(self, key): + return self._item.__contains__(key) + + def __iter__(self): + return self._item.__iter__ + + def __getitem__(self, key): + return self._item.__getitem__(key) + + def __setitem__(self, key, value): + return self._item.__setitem__(key, value) + + def get(self, key, default=None): + return self._item.get(key, default) + + @staticmethod + def create(source: "LocalSource", **fields) -> "Item": + if "id" not in fields: + raise KeyError("id") + item = { + "id": fields["id"], + "source": source.source_name, + "created": int(current_time()), + "active": True, + } + for field_name in ( + "title", + "author", + "body", + "link", + "time", + "tags", + "tts", + "ttl", + "ttd", + "action", + ): + if val := fields.get(field_name): + item[field_name] = val + return Item(source, item) + + @property + def display_title(self): + return self._item.get("title", self._item["id"]) + + @property + def abs_tts(self): + if "tts" not in self._item: + return None + return self._item["created"] + self._item["tts"] + + @property + def can_remove(self): + # The time-to-live fields protects an item from removal until expiry. + # This is mainly used to avoid old items resurfacing when their source + # cannot guarantee monotonocity. + if "ttl" in self._item: + ttl_date = self._item["created"] + self._item["ttl"] + if ttl_date > current_time(): + return False + + # The time-to-die field puts a maximum lifespan on an item, removing it + # even if it is active. + if "ttd" in self._item: + ttd_date = self._item["created"] + self._item["ttd"] + if ttd_date < current_time(): + return True + + return not self._item["active"] + + @property + def before_tts(self): + return ( + "tts" in self._item + and self._item["created"] + self._item["tts"] < current_time() + ) + + @property + def is_hidden(self): + return not self._item["active"] or self.before_tts + + @property + def sort_key(self): + item_date = self._item.get( + "time", + self._item.get( + "created", + ), + ) + return (item_date, self._item["id"]) + + def serialize(self, indent=True): + return json.dumps(self._item, indent=2 if indent else None) + + def update_from(self, updated: "Item") -> None: + for field in ( + "title", + "author", + "body", + "link", + "time", + "tags", + "tts", + "ttl", + "ttd", + ): + if field in updated and self[field] != updated[field]: + self[field] = updated[field] + # Actions are not updated since the available actions and associated + # content is left to the action executor to manage. + + class LocalSource: """ An intake source backed by a filesystem directory. @@ -49,38 +171,25 @@ class LocalSource: def item_exists(self, item_id) -> bool: return self.get_item_path(item_id).exists() - def new_item(self, item: dict) -> dict: - # Ensure required fields - if "id" not in item: - raise KeyError("id") - item["source"] = self.source_name - item["active"] = True - item["created"] = int(time.time()) - item["title"] = item.get("title", item["id"]) - item["tags"] = item.get("tags", [self.source_name]) - # All other fields are optiona - self.save_item(item) - return item - - def get_item(self, item_id: str) -> dict: + def get_item(self, item_id: str) -> Item: with self.get_item_path(item_id).open() as f: - return json.load(f) + return Item(self, json.load(f)) - def save_item(self, item: dict) -> None: + def save_item(self, item: Item) -> None: # Write to a tempfile first to avoid losing the item on write failure item_path = self.get_item_path(item["id"]) tmp_path = item_path.with_name(f"{item_path.name}.tmp") with tmp_path.open("w") as f: - f.write(json.dumps(item, indent=2)) + f.write(item.serialize()) os.rename(tmp_path, item_path) def delete_item(self, item_id) -> None: os.remove(self.get_item_path(item_id)) - def get_all_items(self) -> List[dict]: + def get_all_items(self) -> List[Item]: for filepath in self.source_path.iterdir(): if filepath.name.endswith(".item"): - yield json.loads(filepath.read_text(encoding="utf8")) + yield Item(self, json.loads(filepath.read_text(encoding="utf8"))) def _read_stdout(process: Popen, output: list) -> None: @@ -188,7 +297,7 @@ def fetch_items(source: LocalSource, timeout: int = 60) -> List[dict]: for line in output: try: - item = json.loads(line) + item = Item.create(source, **json.loads(line)) items.append(item) except json.JSONDecodeError: raise SourceUpdateException("invalid json") @@ -202,23 +311,23 @@ def execute_action( """ Execute the action for a feed source. """ - item = source.get_item(item_id) + item: Item = source.get_item(item_id) output = _execute_source_action( - source, action, json.dumps(item), timedelta(timeout) + source, action, item.serialize(indent=False), timedelta(timeout) ) if not output: raise SourceUpdateException("no item") try: - item = json.loads(output[0]) + item = Item(source, json.loads(output[0])) source.save_item(item) return item except json.JSONDecodeError: raise SourceUpdateException("invalid json") -def update_items(source: LocalSource, fetched_items): +def update_items(source: LocalSource, fetched_items: List[Item]): """ Update the source with a batch of new items, doing creations, updates, and deletions as necessary. @@ -228,8 +337,8 @@ def update_items(source: LocalSource, fetched_items): print(f"Found {len(prior_ids)} prior items") # Determine which items are new and which are updates. - new_items = [] - upd_items = [] + new_items: List[Item] = [] + upd_items: List[Item] = [] for item in fetched_items: if source.item_exists(item["id"]): upd_items.append(item) @@ -239,60 +348,24 @@ def update_items(source: LocalSource, fetched_items): # Write all the new items to the source directory. for item in new_items: # TODO: support on-create trigger - source.new_item(item) + source.save_item(item) # Update the other items using the fetched items' values. for upd_item in upd_items: old_item = source.get_item(upd_item["id"]) - for field in ( - "title", - "tags", - "link", - "time", - "author", - "body", - "ttl", - "ttd", - "tts", - ): - if field in upd_item and old_item[field] != upd_item[field]: - old_item[field] = upd_item[field] - if "callback" in upd_item: - # Because of the way this update happens, any fields that are set - # in the callback when the item is new will keep their original - # values, as those values reappear in new_item on subsequent - # updates. - old_item["callback"] = {**old_item["callback"], **upd_item["callback"]} + old_item.update_from(upd_item) + source.save_item(old_item) # Items are removed when they are old (not in the latest fetch) and # inactive. Some item fields change this basic behavior. del_count = 0 - now = int(time.time()) + # now = int(current_time()) upd_ids = [item["id"] for item in upd_items] old_item_ids = [item_id for item_id in prior_ids if item_id not in upd_ids] for item_id in old_item_ids: - item = source.get_item(item_id) - remove = not item["active"] - - # The time-to-live field protects an item from removal until expiry. - # This is mainly used to avoid old items resurfacing when their source - # cannot guarantee monotonicity. - if "ttl" in item: - ttl_date = item["created"] + item["ttl"] - if ttl_date > now: - continue - - # The time-to-die field puts a maximum lifespan on an item, removing it - # even if it is active. - if "ttd" in item: - ttd_date = item["created"] + item["ttd"] - if ttd_date < now: - remove = True - - # Items to be removed are deleted. - if remove: - source.delete_item(item["id"]) + if source.get_item(item_id).can_remove: + source.delete_item(item_id) del_count += 1 print(len(new_items), "new,", del_count, "deleted") diff --git a/intake/templates/feed.jinja2 b/intake/templates/feed.jinja2 index 5d1ee25..0e01836 100644 --- a/intake/templates/feed.jinja2 +++ b/intake/templates/feed.jinja2 @@ -127,8 +127,8 @@ var doAction = function (source, itemid, action) { {% if items %} {% for item in items %}
{% if item.id %} @@ -143,7 +143,7 @@ var doAction = function (source, itemid, action) { {# The item title is a clickable if there is body content #} {% if item.body or item.action %}
- {{item.title}} + {{item.display_title}} {% if item.body %}

{{item.body|safe}}

{% endif %} @@ -152,7 +152,7 @@ var doAction = function (source, itemid, action) { {% endfor %}
{% else %} - {{item.title}}
+ {{item.display_title}}
{% endif %} {# author/time footer line #}