Add an Item wrapper object

This commit is contained in:
Tim Van Baak 2023-06-03 20:57:37 -07:00
parent 0de40b1b5c
commit 9407b53c7a
4 changed files with 164 additions and 106 deletions

View File

@ -8,7 +8,7 @@ import time
from flask import Flask, render_template, request, jsonify, abort, redirect, url_for
from intake.source import LocalSource, execute_action
from intake.source import LocalSource, execute_action, Item
# Globals
app = Flask(__name__)
@ -24,18 +24,8 @@ def intake_data_dir() -> Path:
raise Exception("No intake data directory defined")
def item_sort_key(item):
item_date = item.get("time", item.get("created", 0))
return (item_date, item["id"])
def show_item(item):
"""
Whether to show an item based on active and tts.
"""
return item["active"] and (
"tts" not in item or item["created"] + item["tts"] < int(time.time())
)
def item_sort_key(item: Item):
return item.sort_key
@app.template_filter("datetimeformat")
@ -80,9 +70,7 @@ def source_feed(name):
if not source.source_path.exists():
abort(404)
return _sources_feed(
name, [source], show_hidden=request.args.get("hidden", True)
)
return _sources_feed(name, [source], show_hidden=request.args.get("hidden", True))
@app.get("/channel/<string:name>")
@ -98,9 +86,7 @@ def channel_feed(name):
abort(404)
sources = [LocalSource(intake_data_dir(), name) for name in channels[name]]
return _sources_feed(
name, sources, show_hidden=request.args.get("hidden", False)
)
return _sources_feed(name, sources, show_hidden=request.args.get("hidden", False))
def _sources_feed(name: str, sources: List[LocalSource], show_hidden: bool):
@ -113,7 +99,7 @@ def _sources_feed(name: str, sources: List[LocalSource], show_hidden: bool):
item
for source in sources
for item in source.get_all_items()
if show_item(item) or show_hidden
if not item.is_hidden or show_hidden
],
key=item_sort_key,
)
@ -308,21 +294,17 @@ def add_item():
source_path.mkdir()
config_path = source_path / "intake.json"
if not config_path.exists():
config_path.write_text(json.dumps({
"action": {
"fetch": {
"exe": "true"
}
}
}, indent=2))
config_path.write_text(
json.dumps({"action": {"fetch": {"exe": "true"}}}, indent=2)
)
source = LocalSource(source_path.parent, source_path.name)
# Clean up the fields
item = {key: value for key, value in request.form.items() if value}
item["id"] = '{:x}'.format(getrandbits(16 * 4))
fields = {key: value for key, value in request.form.items() if value}
fields["id"] = "{:x}".format(getrandbits(16 * 4))
# TODO: this doesn't support tags or ttX fields correctly
source.new_item(item)
item = Item.create(source, **fields)
source.save_item(item)
return redirect(url_for("source_feed", name="default"))

View File

@ -224,7 +224,10 @@ def cmd_feed(cmd_args):
for name in args.sources
if (data / name / "intake.json").exists()
]
items = [item for source in sources for item in source.get_all_items()]
items = sorted(
[item for source in sources for item in source.get_all_items()],
key=lambda item: item.sort_key,
)
if not items:
print("Feed is empty")
@ -234,7 +237,7 @@ def cmd_feed(cmd_args):
width = min(80, size.columns)
for item in items:
title = item["title"] if "title" in item else ""
title = item.display_title
titles = [title]
while len(titles[-1]) > width - 4:
i = titles[-1][: width - 4].rfind(" ")

View File

@ -2,15 +2,137 @@ from datetime import timedelta
from pathlib import Path
from subprocess import Popen, PIPE, TimeoutExpired
from threading import Thread
from time import time as current_time
from typing import List
import json
import os
import os.path
import time
from intake.types import InvalidConfigException, SourceUpdateException
class Item:
"""
A wrapper for an item object.
"""
def __init__(self, source: "LocalSource", item: dict):
self.source = source
self._item = item
# Methods to allow Item as a drop-in replacement for the item dict itself
def __contains__(self, key):
return self._item.__contains__(key)
def __iter__(self):
return self._item.__iter__
def __getitem__(self, key):
return self._item.__getitem__(key)
def __setitem__(self, key, value):
return self._item.__setitem__(key, value)
def get(self, key, default=None):
return self._item.get(key, default)
@staticmethod
def create(source: "LocalSource", **fields) -> "Item":
if "id" not in fields:
raise KeyError("id")
item = {
"id": fields["id"],
"source": source.source_name,
"created": int(current_time()),
"active": True,
}
for field_name in (
"title",
"author",
"body",
"link",
"time",
"tags",
"tts",
"ttl",
"ttd",
"action",
):
if val := fields.get(field_name):
item[field_name] = val
return Item(source, item)
@property
def display_title(self):
return self._item.get("title", self._item["id"])
@property
def abs_tts(self):
if "tts" not in self._item:
return None
return self._item["created"] + self._item["tts"]
@property
def can_remove(self):
# The time-to-live fields protects an item from removal until expiry.
# This is mainly used to avoid old items resurfacing when their source
# cannot guarantee monotonocity.
if "ttl" in self._item:
ttl_date = self._item["created"] + self._item["ttl"]
if ttl_date > current_time():
return False
# The time-to-die field puts a maximum lifespan on an item, removing it
# even if it is active.
if "ttd" in self._item:
ttd_date = self._item["created"] + self._item["ttd"]
if ttd_date < current_time():
return True
return not self._item["active"]
@property
def before_tts(self):
return (
"tts" in self._item
and self._item["created"] + self._item["tts"] < current_time()
)
@property
def is_hidden(self):
return not self._item["active"] or self.before_tts
@property
def sort_key(self):
item_date = self._item.get(
"time",
self._item.get(
"created",
),
)
return (item_date, self._item["id"])
def serialize(self, indent=True):
return json.dumps(self._item, indent=2 if indent else None)
def update_from(self, updated: "Item") -> None:
for field in (
"title",
"author",
"body",
"link",
"time",
"tags",
"tts",
"ttl",
"ttd",
):
if field in updated and self[field] != updated[field]:
self[field] = updated[field]
# Actions are not updated since the available actions and associated
# content is left to the action executor to manage.
class LocalSource:
"""
An intake source backed by a filesystem directory.
@ -49,38 +171,25 @@ class LocalSource:
def item_exists(self, item_id) -> bool:
return self.get_item_path(item_id).exists()
def new_item(self, item: dict) -> dict:
# Ensure required fields
if "id" not in item:
raise KeyError("id")
item["source"] = self.source_name
item["active"] = True
item["created"] = int(time.time())
item["title"] = item.get("title", item["id"])
item["tags"] = item.get("tags", [self.source_name])
# All other fields are optiona
self.save_item(item)
return item
def get_item(self, item_id: str) -> dict:
def get_item(self, item_id: str) -> Item:
with self.get_item_path(item_id).open() as f:
return json.load(f)
return Item(self, json.load(f))
def save_item(self, item: dict) -> None:
def save_item(self, item: Item) -> None:
# Write to a tempfile first to avoid losing the item on write failure
item_path = self.get_item_path(item["id"])
tmp_path = item_path.with_name(f"{item_path.name}.tmp")
with tmp_path.open("w") as f:
f.write(json.dumps(item, indent=2))
f.write(item.serialize())
os.rename(tmp_path, item_path)
def delete_item(self, item_id) -> None:
os.remove(self.get_item_path(item_id))
def get_all_items(self) -> List[dict]:
def get_all_items(self) -> List[Item]:
for filepath in self.source_path.iterdir():
if filepath.name.endswith(".item"):
yield json.loads(filepath.read_text(encoding="utf8"))
yield Item(self, json.loads(filepath.read_text(encoding="utf8")))
def _read_stdout(process: Popen, output: list) -> None:
@ -188,7 +297,7 @@ def fetch_items(source: LocalSource, timeout: int = 60) -> List[dict]:
for line in output:
try:
item = json.loads(line)
item = Item.create(source, **json.loads(line))
items.append(item)
except json.JSONDecodeError:
raise SourceUpdateException("invalid json")
@ -202,23 +311,23 @@ def execute_action(
"""
Execute the action for a feed source.
"""
item = source.get_item(item_id)
item: Item = source.get_item(item_id)
output = _execute_source_action(
source, action, json.dumps(item), timedelta(timeout)
source, action, item.serialize(indent=False), timedelta(timeout)
)
if not output:
raise SourceUpdateException("no item")
try:
item = json.loads(output[0])
item = Item(source, json.loads(output[0]))
source.save_item(item)
return item
except json.JSONDecodeError:
raise SourceUpdateException("invalid json")
def update_items(source: LocalSource, fetched_items):
def update_items(source: LocalSource, fetched_items: List[Item]):
"""
Update the source with a batch of new items, doing creations, updates, and
deletions as necessary.
@ -228,8 +337,8 @@ def update_items(source: LocalSource, fetched_items):
print(f"Found {len(prior_ids)} prior items")
# Determine which items are new and which are updates.
new_items = []
upd_items = []
new_items: List[Item] = []
upd_items: List[Item] = []
for item in fetched_items:
if source.item_exists(item["id"]):
upd_items.append(item)
@ -239,60 +348,24 @@ def update_items(source: LocalSource, fetched_items):
# Write all the new items to the source directory.
for item in new_items:
# TODO: support on-create trigger
source.new_item(item)
source.save_item(item)
# Update the other items using the fetched items' values.
for upd_item in upd_items:
old_item = source.get_item(upd_item["id"])
for field in (
"title",
"tags",
"link",
"time",
"author",
"body",
"ttl",
"ttd",
"tts",
):
if field in upd_item and old_item[field] != upd_item[field]:
old_item[field] = upd_item[field]
if "callback" in upd_item:
# Because of the way this update happens, any fields that are set
# in the callback when the item is new will keep their original
# values, as those values reappear in new_item on subsequent
# updates.
old_item["callback"] = {**old_item["callback"], **upd_item["callback"]}
old_item.update_from(upd_item)
source.save_item(old_item)
# Items are removed when they are old (not in the latest fetch) and
# inactive. Some item fields change this basic behavior.
del_count = 0
now = int(time.time())
# now = int(current_time())
upd_ids = [item["id"] for item in upd_items]
old_item_ids = [item_id for item_id in prior_ids if item_id not in upd_ids]
for item_id in old_item_ids:
item = source.get_item(item_id)
remove = not item["active"]
# The time-to-live field protects an item from removal until expiry.
# This is mainly used to avoid old items resurfacing when their source
# cannot guarantee monotonicity.
if "ttl" in item:
ttl_date = item["created"] + item["ttl"]
if ttl_date > now:
continue
# The time-to-die field puts a maximum lifespan on an item, removing it
# even if it is active.
if "ttd" in item:
ttd_date = item["created"] + item["ttd"]
if ttd_date < now:
remove = True
# Items to be removed are deleted.
if remove:
source.delete_item(item["id"])
if source.get_item(item_id).can_remove:
source.delete_item(item_id)
del_count += 1
print(len(new_items), "new,", del_count, "deleted")

View File

@ -127,8 +127,8 @@ var doAction = function (source, itemid, action) {
{% if items %}
{% for item in items %}
<div class="readable-item
{%- if not item.active %} strikethru fade{% endif %}
{%- if item.active and item.tts and item.created + item.tts > now %} fade{% endif -%}
{%- if not item.active %} strikethru{% endif %}
{%- if item.is_hidden %} fade{% endif -%}
" id="{{item.source}}-{{item.id}}">
{% if item.id %}
<button class="item-button" onclick="javascript:deactivate('{{item.source}}', '{{item.id}}')" title="Deactivate">&#10005;</button>
@ -143,7 +143,7 @@ var doAction = function (source, itemid, action) {
{# The item title is a clickable <summary> if there is body content #}
{% if item.body or item.action %}
<details>
<summary><span class="item-title">{{item.title}}</span></summary>
<summary><span class="item-title">{{item.display_title}}</span></summary>
{% if item.body %}
<p>{{item.body|safe}}</p>
{% endif %}
@ -152,7 +152,7 @@ var doAction = function (source, itemid, action) {
{% endfor %}
</details>
{% else %}
<span class="item-title">{{item.title}}</span><br>
<span class="item-title">{{item.display_title}}</span><br>
{% endif %}
{# author/time footer line #}