Refactor intake_data_dir to core

This commit is contained in:
Tim Van Baak 2023-06-21 15:29:52 -07:00
parent e2b2ee2a00
commit 53740655b6
3 changed files with 55 additions and 53 deletions

View File

@ -7,24 +7,15 @@ import json
import os import os
import time import time
from flask import Flask, render_template, request, jsonify, abort, redirect, url_for from flask import Flask, render_template, request, jsonify, abort, redirect, url_for, current_app
from intake.core import intake_data_dir
from intake.source import LocalSource, execute_action, Item from intake.source import LocalSource, execute_action, Item
# Globals # Globals
app = Flask(__name__) app = Flask(__name__)
def intake_data_dir() -> Path:
if intake_data := os.environ.get("INTAKE_DATA"):
return Path(intake_data)
if xdg_data_home := os.environ.get("XDG_DATA_HOME"):
return Path(xdg_data_home) / "intake"
if home := os.environ.get("HOME"):
return Path(home) / ".local" / "share" / "intake"
raise Exception("No intake data directory defined")
def item_sort_key(item: Item): def item_sort_key(item: Item):
return item.sort_key return item.sort_key
@ -71,7 +62,7 @@ def auth_check(route):
@wraps(route) @wraps(route)
def _route(*args, **kwargs): def _route(*args, **kwargs):
data_path = intake_data_dir() data_path: Path = current_app.config["INTAKE_DATA"]
auth_path = data_path / "credentials.json" auth_path = data_path / "credentials.json"
if auth_path.exists(): if auth_path.exists():
if not request.authorization: if not request.authorization:
@ -92,7 +83,7 @@ def root():
""" """
Navigation home page. Navigation home page.
""" """
data_path = intake_data_dir() data_path: Path = current_app.config["INTAKE_DATA"]
sources = [] sources = []
for child in data_path.iterdir(): for child in data_path.iterdir():
@ -118,7 +109,8 @@ def source_feed(name):
""" """
Feed view for a single source. Feed view for a single source.
""" """
source = LocalSource(intake_data_dir(), name) data_path: Path = current_app.config["INTAKE_DATA"]
source = LocalSource(data_path, name)
if not source.source_path.exists(): if not source.source_path.exists():
abort(404) abort(404)
@ -131,13 +123,14 @@ def channel_feed(name):
""" """
Feed view for a channel. Feed view for a channel.
""" """
channels_config_path = intake_data_dir() / "channels.json" data_path: Path = current_app.config["INTAKE_DATA"]
channels_config_path = data_path / "channels.json"
if not channels_config_path.exists(): if not channels_config_path.exists():
abort(404) abort(404)
channels = json.loads(channels_config_path.read_text(encoding="utf8")) channels = json.loads(channels_config_path.read_text(encoding="utf8"))
if name not in channels: if name not in channels:
abort(404) abort(404)
sources = [LocalSource(intake_data_dir(), name) for name in channels[name]] sources = [LocalSource(data_path, name) for name in channels[name]]
return _sources_feed(name, sources, show_hidden=get_show_hidden(False)) return _sources_feed(name, sources, show_hidden=get_show_hidden(False))
@ -190,7 +183,8 @@ def _sources_feed(name: str, sources: List[LocalSource], show_hidden: bool):
@app.delete("/item/<string:source_name>/<string:item_id>") @app.delete("/item/<string:source_name>/<string:item_id>")
@auth_check @auth_check
def deactivate(source_name, item_id): def deactivate(source_name, item_id):
source = LocalSource(intake_data_dir(), source_name) data_path: Path = current_app.config["INTAKE_DATA"]
source = LocalSource(data_path, source_name)
item = source.get_item(item_id) item = source.get_item(item_id)
if item["active"]: if item["active"]:
print(f"Deactivating {source_name}/{item_id}") print(f"Deactivating {source_name}/{item_id}")
@ -202,7 +196,8 @@ def deactivate(source_name, item_id):
@app.patch("/item/<string:source_name>/<string:item_id>") @app.patch("/item/<string:source_name>/<string:item_id>")
@auth_check @auth_check
def update(source_name, item_id): def update(source_name, item_id):
source = LocalSource(intake_data_dir(), source_name) data_path: Path = current_app.config["INTAKE_DATA"]
source = LocalSource(data_path, source_name)
item = source.get_item(item_id) item = source.get_item(item_id)
params = request.get_json() params = request.get_json()
if "tts" in params: if "tts" in params:
@ -217,13 +212,14 @@ def update(source_name, item_id):
@app.post("/mass-deactivate/") @app.post("/mass-deactivate/")
@auth_check @auth_check
def mass_deactivate(): def mass_deactivate():
data_path: Path = current_app.config["INTAKE_DATA"]
params = request.get_json() params = request.get_json()
if "items" not in params: if "items" not in params:
print(f"Bad request params: {params}") print(f"Bad request params: {params}")
for info in params.get("items"): for info in params.get("items"):
source = info["source"] source = info["source"]
itemid = info["itemid"] itemid = info["itemid"]
source = LocalSource(intake_data_dir(), source) source = LocalSource(data_path, source)
item = source.get_item(itemid) item = source.get_item(itemid)
if item["active"]: if item["active"]:
print(f"Deactivating {info['source']}/{info['itemid']}") print(f"Deactivating {info['source']}/{info['itemid']}")
@ -235,7 +231,8 @@ def mass_deactivate():
@app.post("/action/<string:source_name>/<string:item_id>/<string:action>") @app.post("/action/<string:source_name>/<string:item_id>/<string:action>")
@auth_check @auth_check
def action(source_name, item_id, action): def action(source_name, item_id, action):
source = LocalSource(intake_data_dir(), source_name) data_path: Path = current_app.config["INTAKE_DATA"]
source = LocalSource(data_path, source_name)
item = execute_action(source, item_id, action) item = execute_action(source, item_id, action)
return jsonify(item) return jsonify(item)
@ -246,7 +243,8 @@ def source_edit(name):
""" """
Config editor for a source Config editor for a source
""" """
source = LocalSource(intake_data_dir(), name) data_path: Path = current_app.config["INTAKE_DATA"]
source = LocalSource(data_path, name)
if not source.source_path.exists(): if not source.source_path.exists():
abort(404) abort(404)
@ -301,7 +299,8 @@ def channels_edit():
""" """
Config editor for channels Config editor for channels
""" """
config_path = intake_data_dir() / "channels.json" data_path: Path = current_app.config["INTAKE_DATA"]
config_path = data_path / "channels.json"
# For POST, check if the config is valid # For POST, check if the config is valid
error_message: str = None error_message: str = None
@ -350,7 +349,8 @@ def _parse_channels_config(config_str: str):
@auth_check @auth_check
def add_item(): def add_item():
# Ensure the default source exists # Ensure the default source exists
source_path = intake_data_dir() / "default" data_path: Path = current_app.config["INTAKE_DATA"]
source_path = data_path / "default"
if not source_path.exists(): if not source_path.exists():
source_path.mkdir() source_path.mkdir()
config_path = source_path / "intake.json" config_path = source_path / "intake.json"
@ -390,5 +390,5 @@ def _get_ttx_for_date(dt: datetime) -> int:
def wsgi(): def wsgi():
# init_default_logging() app.config["INTAKE_DATA"] = intake_data_dir()
return app return app

View File

@ -10,16 +10,11 @@ import pwd
import subprocess import subprocess
import sys import sys
from intake.core import intake_data_dir
from intake.source import fetch_items, LocalSource, update_items, execute_action from intake.source import fetch_items, LocalSource, update_items, execute_action
from intake.types import InvalidConfigException, SourceUpdateException from intake.types import InvalidConfigException, SourceUpdateException
def intake_data_dir() -> Path:
home = Path(os.environ["HOME"])
data_home = Path(os.environ.get("XDG_DATA_HOME", home / ".local" / "share"))
intake_data = data_home / "intake"
return intake_data
def cmd_edit(cmd_args): def cmd_edit(cmd_args):
"""Open a source's config for editing.""" """Open a source's config for editing."""
@ -30,7 +25,6 @@ def cmd_edit(cmd_args):
parser.add_argument( parser.add_argument(
"--data", "--data",
"-d", "-d",
default=intake_data_dir(),
help="Path to the intake data directory", help="Path to the intake data directory",
) )
parser.add_argument( parser.add_argument(
@ -40,14 +34,14 @@ def cmd_edit(cmd_args):
help="Source name to edit", help="Source name to edit",
) )
args = parser.parse_args(cmd_args) args = parser.parse_args(cmd_args)
data_path: Path = Path(args.data) if args.data else intake_data_dir()
editor_cmd = os.environ.get("EDITOR") editor_cmd = os.environ.get("EDITOR")
if not editor_cmd: if not editor_cmd:
print("Cannot edit, no EDITOR set") print("Cannot edit, no EDITOR set")
return 1 return 1
data = Path(args.data) source_path: Path = data_path / args.source
source_path: Path = data / args.source
if not source_path.exists(): if not source_path.exists():
yn = input("Source does not exist, create? [yN] ") yn = input("Source does not exist, create? [yN] ")
if yn.strip().lower() != "y": if yn.strip().lower() != "y":
@ -69,7 +63,7 @@ def cmd_edit(cmd_args):
) )
# Make a copy of the config # Make a copy of the config
source = LocalSource(data, args.source) source = LocalSource(data_path, args.source)
tmp_path = source.source_path / "intake.json.tmp" tmp_path = source.source_path / "intake.json.tmp"
tmp_path.write_text(json.dumps(source.get_config(), indent=2)) tmp_path.write_text(json.dumps(source.get_config(), indent=2))
@ -102,7 +96,6 @@ def cmd_update(cmd_args):
parser.add_argument( parser.add_argument(
"--data", "--data",
"-d", "-d",
default=intake_data_dir(),
help="Path to the intake data directory containing source directories", help="Path to the intake data directory containing source directories",
) )
parser.add_argument( parser.add_argument(
@ -118,7 +111,8 @@ def cmd_update(cmd_args):
) )
args = parser.parse_args(cmd_args) args = parser.parse_args(cmd_args)
source = LocalSource(Path(args.data), args.source) data_path: Path = Path(args.data) if args.data else intake_data_dir()
source = LocalSource(data_path, args.source)
try: try:
items = fetch_items(source) items = fetch_items(source)
if not args.dry_run: if not args.dry_run:
@ -147,7 +141,6 @@ def cmd_action(cmd_args):
parser.add_argument( parser.add_argument(
"--data", "--data",
"-d", "-d",
default=intake_data_dir(),
help="Path to the intake data directory containing source directories", help="Path to the intake data directory containing source directories",
) )
parser.add_argument( parser.add_argument(
@ -170,7 +163,8 @@ def cmd_action(cmd_args):
) )
args = parser.parse_args(cmd_args) args = parser.parse_args(cmd_args)
source = LocalSource(Path(args.data), args.source) data_path: Path = Path(args.data) if args.data else intake_data_dir()
source = LocalSource(data_path, args.source)
try: try:
item = execute_action(source, args.item, args.action, 5) item = execute_action(source, args.item, args.action, 5)
print("Item:", item) print("Item:", item)
@ -202,7 +196,6 @@ def cmd_feed(cmd_args):
parser.add_argument( parser.add_argument(
"--data", "--data",
"-d", "-d",
default=intake_data_dir(),
help="Path to the intake data directory", help="Path to the intake data directory",
) )
parser.add_argument( parser.add_argument(
@ -211,21 +204,20 @@ def cmd_feed(cmd_args):
nargs="+", nargs="+",
help="Limit feed to these sources", help="Limit feed to these sources",
) )
parser.add_argument
args = parser.parse_args(cmd_args) args = parser.parse_args(cmd_args)
data = Path(args.data) data_path: Path = Path(args.data) if args.data else intake_data_dir()
if not data.exists() and data.is_dir(): if not data_path.exists() and data_path.is_dir():
print("Not a directory:", data) print("Not a directory:", data_path)
return 1 return 1
if not args.sources: if not args.sources:
args.sources = [child.name for child in data.iterdir()] args.sources = [child.name for child in data_path.iterdir()]
sources = [ sources = [
LocalSource(data, name) LocalSource(data_path, name)
for name in args.sources for name in args.sources
if (data / name / "intake.json").exists() if (data_path / name / "intake.json").exists()
] ]
items = sorted( items = sorted(
[item for source in sources for item in source.get_all_items()], [item for source in sources for item in source.get_all_items()],
@ -275,7 +267,6 @@ def cmd_passwd(cmd_args):
parser.add_argument( parser.add_argument(
"--data", "--data",
"-d", "-d",
default=intake_data_dir(),
help="Path to the intake data directory", help="Path to the intake data directory",
) )
args = parser.parse_args(cmd_args) args = parser.parse_args(cmd_args)
@ -285,7 +276,8 @@ def cmd_passwd(cmd_args):
print("Could not find htpasswd, cannot update password") print("Could not find htpasswd, cannot update password")
return 1 return 1
creds = Path(args.data) / "credentials.json" data_path: Path = Path(args.data) if args.data else intake_data_dir()
creds = Path(data_path) / "credentials.json"
if not creds.parent.exists(): if not creds.parent.exists():
creds.parent.mkdir(parents=True) creds.parent.mkdir(parents=True)
@ -313,20 +305,18 @@ def cmd_run(cmd_args):
parser.add_argument( parser.add_argument(
"--data", "--data",
"-d", "-d",
default=intake_data_dir(),
help="Path to the intake data directory containing source directories", help="Path to the intake data directory containing source directories",
) )
parser.add_argument("--debug", action="store_true") parser.add_argument("--debug", action="store_true")
parser.add_argument("--port", type=int, default=5000) parser.add_argument("--port", type=int, default=5000)
args = parser.parse_args(cmd_args) args = parser.parse_args(cmd_args)
# Hack: the web server isn't wired up to take this configuration value data_path: Path = Path(args.data) if args.data else intake_data_dir()
# directly, but it will work to stuff it into the first env checked
os.environ["INTAKE_DATA"] = str(args.data)
try: try:
from intake.app import app from intake.app import app
app.config["INTAKE_DATA"] = data_path
print(app.config)
app.run(port=args.port, debug=args.debug) app.run(port=args.port, debug=args.debug)
return 0 return 0
except Exception as ex: except Exception as ex:

12
intake/core.py Normal file
View File

@ -0,0 +1,12 @@
from pathlib import Path
import os
def intake_data_dir() -> Path:
if intake_data := os.environ.get("INTAKE_DATA"):
return Path(intake_data)
if xdg_data_home := os.environ.get("XDG_DATA_HOME"):
return Path(xdg_data_home) / "intake"
if home := os.environ.get("HOME"):
return Path(home) / ".local" / "share" / "intake"
raise Exception("No intake data directory defined")