diff --git a/inquisitor/__init__.py b/inquisitor/__init__.py
index d6e633c..dcaacc1 100644
--- a/inquisitor/__init__.py
+++ b/inquisitor/__init__.py
@@ -1,2 +1,2 @@
from inquisitor.configs import CACHE_PATH
-from inquisitor.templates import cache_image, LinearCrawler, RedditScraper
\ No newline at end of file
+from inquisitor.templates import cache_image, LinearCrawler, RedditScraper
diff --git a/inquisitor/__main__.py b/inquisitor/__main__.py
index d49c138..d92dc90 100644
--- a/inquisitor/__main__.py
+++ b/inquisitor/__main__.py
@@ -1,2 +1,3 @@
from cli import main
-main()
\ No newline at end of file
+
+main()
diff --git a/inquisitor/app.py b/inquisitor/app.py
index 00d4b54..e18d8b5 100644
--- a/inquisitor/app.py
+++ b/inquisitor/app.py
@@ -8,13 +8,14 @@ from flask import Flask, render_template, request, jsonify, abort, redirect, url
# Application imports
from inquisitor.configs import (
- DUNGEON_PATH,
- SOURCES_PATH,
- CACHE_PATH,
- subfeeds,
- get_subfeed_overrides,
- logger,
- init_default_logging)
+ DUNGEON_PATH,
+ SOURCES_PATH,
+ CACHE_PATH,
+ subfeeds,
+ get_subfeed_overrides,
+ logger,
+ init_default_logging,
+)
from inquisitor import sources, loader, timestamp
# Globals
@@ -22,173 +23,202 @@ app = Flask(__name__)
def make_query_link(text, wl, bl):
- wlp = "only=" + ",".join(wl)
- blp = "not=" + ",".join(bl)
- params = [p for p in (wlp, blp) if not p.endswith("=")]
- query = "?{}".format("&".join(params))
- return '{0}'.format(text, query)
+ wlp = "only=" + ",".join(wl)
+ blp = "not=" + ",".join(bl)
+ params = [p for p in (wlp, blp) if not p.endswith("=")]
+ query = "?{}".format("&".join(params))
+ return '{0}'.format(text, query)
+
@app.template_filter("datetimeformat")
def datetimeformat(value):
- return timestamp.stamp_to_readable(value) if value is not None else ""
+ return timestamp.stamp_to_readable(value) if value is not None else ""
+
@app.route("/")
def root():
- return redirect(url_for('feed'))
+ return redirect(url_for("feed"))
+
@app.route("/feed/")
def feed():
- return feed_for_sources(source_names=None)
+ return feed_for_sources(source_names=None)
+
@app.route("/feed//")
def subfeed(feed_name):
- # Check for and apply subfeed overrides
- subfeed_overrides = get_subfeed_overrides()
- subfeed_config = subfeed_overrides or subfeeds or {}
+ # Check for and apply subfeed overrides
+ subfeed_overrides = get_subfeed_overrides()
+ subfeed_config = subfeed_overrides or subfeeds or {}
- # The built-in inquisitor subfeed contains sources not in another subfeed
- if feed_name == 'inquisitor':
- all_sources = os.listdir(DUNGEON_PATH)
- for subfeed, sources in subfeed_config.items():
- for source_name in sources:
- if source_name in all_sources:
- all_sources.remove(source_name)
- return feed_for_sources(all_sources)
+ # The built-in inquisitor subfeed contains sources not in another subfeed
+ if feed_name == "inquisitor":
+ all_sources = os.listdir(DUNGEON_PATH)
+ for subfeed, sources in subfeed_config.items():
+ for source_name in sources:
+ if source_name in all_sources:
+ all_sources.remove(source_name)
+ return feed_for_sources(all_sources)
+
+ if feed_name not in subfeed_config:
+ return abort(404)
+ return feed_for_sources(subfeed_config[feed_name])
- if feed_name not in subfeed_config:
- return abort(404)
- return feed_for_sources(subfeed_config[feed_name])
def feed_for_sources(source_names):
- # Determine exclusion filters
- filters = []
- wl_param = request.args.get('only')
- wl = wl_param.split(",") if wl_param else []
- bl_param = request.args.get('not')
- bl = bl_param.split(",") if bl_param else []
- if wl:
- filters.append(lambda item: not any([tag in wl for tag in item['tags']]))
- if bl:
- filters.append(lambda item: any([tag in bl for tag in item['tags']]))
+ # Determine exclusion filters
+ filters = []
+ wl_param = request.args.get("only")
+ wl = wl_param.split(",") if wl_param else []
+ bl_param = request.args.get("not")
+ bl = bl_param.split(",") if bl_param else []
+ if wl:
+ filters.append(lambda item: not any([tag in wl for tag in item["tags"]]))
+ if bl:
+ filters.append(lambda item: any([tag in bl for tag in item["tags"]]))
- # Get all active+filtered items and all active tags
- total = 0
- items, errors = loader.load_active_items(source_names)
- active_items = []
- active_tags = {}
- for item in items:
- if item['active']:
- for tag in item['tags']:
- if tag not in active_tags: active_tags[tag] = 0
- active_tags[tag] += 1
- # active_tags |= set(item['tags'])
- total += 1
- if not any(map(lambda f: f(item), filters)):
- active_items.append(item)
- # Sort items by time
- active_items.sort(key=lambda i: i['time'] if 'time' in i and i['time'] else i['created'] if 'created' in i and i['created'] else 0)
+ # Get all active+filtered items and all active tags
+ total = 0
+ items, errors = loader.load_active_items(source_names)
+ active_items = []
+ active_tags = {}
+ for item in items:
+ if item["active"]:
+ for tag in item["tags"]:
+ if tag not in active_tags:
+ active_tags[tag] = 0
+ active_tags[tag] += 1
+ # active_tags |= set(item['tags'])
+ total += 1
+ if not any(map(lambda f: f(item), filters)):
+ active_items.append(item)
+ # Sort items by time
+ active_items.sort(
+ key=lambda i: i["time"]
+ if "time" in i and i["time"]
+ else i["created"]
+ if "created" in i and i["created"]
+ else 0
+ )
- logger.info("Returning {} of {} items".format(len(active_items), total))
- if errors:
- read_ex = {
- 'title': 'Read errors',
- 'body': "{}
".format("\n\n".join(errors)),
- 'created': None,
- }
- active_items.insert(0, read_ex)
+ logger.info("Returning {} of {} items".format(len(active_items), total))
+ if errors:
+ read_ex = {
+ "title": "Read errors",
+ "body": "{}
".format("\n\n".join(errors)),
+ "created": None,
+ }
+ active_items.insert(0, read_ex)
- if total > 0:
- # Create the feed control item
- link_table = ["{0} | {1} | | |
".format(
- total, make_query_link("all", [], []))]
- for tag, count in sorted(active_tags.items(), key=lambda i: i[0].lower()):
- links = [count]
- links.append(make_query_link(tag, [tag], []))
- if tag in wl:
- new_wl = [t for t in wl if t != tag]
- links.append(make_query_link("-only", new_wl, bl))
- else:
- new_bl = [t for t in bl if t != tag]
- links.append(make_query_link("+only", wl + [tag], new_bl))
- if tag in bl:
- new_bl = [t for t in bl if t != tag]
- links.append(make_query_link("-not", wl, new_bl))
- else:
- new_wl = [t for t in wl if t != tag]
- links.append(make_query_link("+not", new_wl, bl + [tag]))
- link_table.append("{0} | {1} | {2} | {3} |
".format(*links))
- body = ''.format("\n".join(link_table))
+ if total > 0:
+ # Create the feed control item
+ link_table = [
+ "{0} | {1} | | |
".format(
+ total, make_query_link("all", [], [])
+ )
+ ]
+ for tag, count in sorted(active_tags.items(), key=lambda i: i[0].lower()):
+ links = [count]
+ links.append(make_query_link(tag, [tag], []))
+ if tag in wl:
+ new_wl = [t for t in wl if t != tag]
+ links.append(make_query_link("-only", new_wl, bl))
+ else:
+ new_bl = [t for t in bl if t != tag]
+ links.append(make_query_link("+only", wl + [tag], new_bl))
+ if tag in bl:
+ new_bl = [t for t in bl if t != tag]
+ links.append(make_query_link("-not", wl, new_bl))
+ else:
+ new_wl = [t for t in wl if t != tag]
+ links.append(make_query_link("+not", new_wl, bl + [tag]))
+ link_table.append(
+ "{0} | {1} | {2} | {3} |
".format(
+ *links
+ )
+ )
+ body = ''.format("\n".join(link_table))
- feed_control = {
- 'title': 'Feed Control [{}/{}]'.format(len(active_items), total),
- 'body': body,
- }
- active_items.insert(0, feed_control)
+ feed_control = {
+ "title": "Feed Control [{}/{}]".format(len(active_items), total),
+ "body": body,
+ }
+ active_items.insert(0, feed_control)
- selection = active_items[:100]
+ selection = active_items[:100]
- return render_template("feed.jinja2",
- items=selection,
- mdeac=[
- {'source': item['source'], 'itemid': item['id']}
- for item in selection
- if 'id' in item])
+ return render_template(
+ "feed.jinja2",
+ items=selection,
+ mdeac=[
+ {"source": item["source"], "itemid": item["id"]}
+ for item in selection
+ if "id" in item
+ ],
+ )
-@app.route("/deactivate/", methods=['POST'])
+
+@app.route("/deactivate/", methods=["POST"])
def deactivate():
- params = request.get_json()
- if 'source' not in params and 'itemid' not in params:
- logger.error("Bad request params: {}".format(params))
- item = loader.load_item(params['source'], params['itemid'])
- if item['active']:
- logger.debug(f"Deactivating {params['source']}/{params['itemid']}")
- item['active'] = False
- return jsonify({'active': item['active']})
+ params = request.get_json()
+ if "source" not in params and "itemid" not in params:
+ logger.error("Bad request params: {}".format(params))
+ item = loader.load_item(params["source"], params["itemid"])
+ if item["active"]:
+ logger.debug(f"Deactivating {params['source']}/{params['itemid']}")
+ item["active"] = False
+ return jsonify({"active": item["active"]})
-@app.route("/punt/", methods=['POST'])
+
+@app.route("/punt/", methods=["POST"])
def punt():
- params = request.get_json()
- if 'source' not in params and 'itemid' not in params:
- logger.error("Bad request params: {}".format(params))
- item = loader.load_item(params['source'], params['itemid'])
- tomorrow = datetime.now() + timedelta(days=1)
- morning = datetime(tomorrow.year, tomorrow.month, tomorrow.day, 6, 0, 0)
- til_then = morning.timestamp() - item['created']
- item['tts'] = til_then
- return jsonify(item.item)
+ params = request.get_json()
+ if "source" not in params and "itemid" not in params:
+ logger.error("Bad request params: {}".format(params))
+ item = loader.load_item(params["source"], params["itemid"])
+ tomorrow = datetime.now() + timedelta(days=1)
+ morning = datetime(tomorrow.year, tomorrow.month, tomorrow.day, 6, 0, 0)
+ til_then = morning.timestamp() - item["created"]
+ item["tts"] = til_then
+ return jsonify(item.item)
-@app.route("/mass-deactivate/", methods=['POST'])
+
+@app.route("/mass-deactivate/", methods=["POST"])
def mass_deactivate():
- params = request.get_json()
- if 'items' not in params:
- logger.error("Bad request params: {}".format(params))
- for info in params.get('items', []):
- source = info['source']
- itemid = info['itemid']
- item = loader.load_item(source, itemid)
- if item['active']:
- logger.debug(f"Deactivating {info['source']}/{info['itemid']}")
- item['active'] = False
- return jsonify({})
+ params = request.get_json()
+ if "items" not in params:
+ logger.error("Bad request params: {}".format(params))
+ for info in params.get("items", []):
+ source = info["source"]
+ itemid = info["itemid"]
+ item = loader.load_item(source, itemid)
+ if item["active"]:
+ logger.debug(f"Deactivating {info['source']}/{info['itemid']}")
+ item["active"] = False
+ return jsonify({})
-@app.route("/callback/", methods=['POST'])
+
+@app.route("/callback/", methods=["POST"])
def callback():
- params = request.get_json()
- if 'source' not in params and 'itemid' not in params:
- logger.error("Bad request params: {}".format(params))
- logger.info('Executing callback for {}/{}'.format(params['source'], params['itemid']))
- sources.item_callback(params['source'], params['itemid'])
- return jsonify({})
+ params = request.get_json()
+ if "source" not in params and "itemid" not in params:
+ logger.error("Bad request params: {}".format(params))
+ logger.info(
+ "Executing callback for {}/{}".format(params["source"], params["itemid"])
+ )
+ sources.item_callback(params["source"], params["itemid"])
+ return jsonify({})
-@app.route('/cache/')
+
+@app.route("/cache/")
def cache(cache_path):
- path = os.path.join(CACHE_PATH, cache_path)
- if not os.path.isfile(path):
- return abort(404)
- with open(path, 'rb') as f:
- return f.read()
+ path = os.path.join(CACHE_PATH, cache_path)
+ if not os.path.isfile(path):
+ return abort(404)
+ with open(path, "rb") as f:
+ return f.read()
def wsgi():
- init_default_logging()
- return app
+ init_default_logging()
+ return app
diff --git a/inquisitor/cli.py b/inquisitor/cli.py
index 63a5bef..67c79f4 100644
--- a/inquisitor/cli.py
+++ b/inquisitor/cli.py
@@ -11,288 +11,324 @@ from inquisitor.configs import logger, DUNGEON_PATH, SOURCES_PATH, add_logging_h
def command_test(args):
- """Echo config file values."""
- from inquisitor.configs.resolver import (
- config_path,
- CONFIG_DATA, data_path,
- CONFIG_SOURCES, source_path,
- CONFIG_CACHE, cache_path,
- CONFIG_LOGFILE, log_file,
- CONFIG_VERBOSE, is_verbose,
- CONFIG_SUBFEEDS, subfeeds,
- )
- subfeeds = '; '.join(
- '{0}: {1}'.format(
- sf_name,
- ' '.join(sf_sources)
- )
- for sf_name, sf_sources
- in subfeeds.items()
- ) if subfeeds else ''
- print(f'Inquisitor configured from {config_path}')
- print(f' {CONFIG_DATA} = {data_path}')
- print(f' {CONFIG_SOURCES} = {source_path}')
- print(f' {CONFIG_CACHE} = {cache_path}')
- print(f' {CONFIG_LOGFILE} = {log_file}')
- print(f' {CONFIG_VERBOSE} = {is_verbose}')
- print(f' {CONFIG_SUBFEEDS} = {subfeeds}')
- return 0
+ """Echo config file values."""
+ from inquisitor.configs.resolver import (
+ config_path,
+ CONFIG_DATA,
+ data_path,
+ CONFIG_SOURCES,
+ source_path,
+ CONFIG_CACHE,
+ cache_path,
+ CONFIG_LOGFILE,
+ log_file,
+ CONFIG_VERBOSE,
+ is_verbose,
+ CONFIG_SUBFEEDS,
+ subfeeds,
+ )
+
+ subfeeds = (
+ "; ".join(
+ "{0}: {1}".format(sf_name, " ".join(sf_sources))
+ for sf_name, sf_sources in subfeeds.items()
+ )
+ if subfeeds
+ else ""
+ )
+ print(f"Inquisitor configured from {config_path}")
+ print(f" {CONFIG_DATA} = {data_path}")
+ print(f" {CONFIG_SOURCES} = {source_path}")
+ print(f" {CONFIG_CACHE} = {cache_path}")
+ print(f" {CONFIG_LOGFILE} = {log_file}")
+ print(f" {CONFIG_VERBOSE} = {is_verbose}")
+ print(f" {CONFIG_SUBFEEDS} = {subfeeds}")
+ return 0
def command_update(args):
- """Fetch and store new items from the specified sources."""
- parser = argparse.ArgumentParser(
- prog="inquisitor update",
- description=command_update.__doc__,
- add_help=False)
- parser.add_argument("source",
- nargs="*",
- help="Sources to update.")
- args = parser.parse_args(args)
+ """Fetch and store new items from the specified sources."""
+ parser = argparse.ArgumentParser(
+ prog="inquisitor update", description=command_update.__doc__, add_help=False
+ )
+ parser.add_argument("source", nargs="*", help="Sources to update.")
+ args = parser.parse_args(args)
- if len(args.source) == 0:
- parser.print_help()
- return 0
- if not os.path.isdir(DUNGEON_PATH):
- logger.error("Couldn't find dungeon. Set INQUISITOR_DUNGEON or cd to parent folder of ./dungeon")
- return -1
- if not os.path.isdir(SOURCES_PATH):
- logger.error("Couldn't find sources. Set INQUISITOR_SOURCES or cd to parent folder of ./sources")
+ if len(args.source) == 0:
+ parser.print_help()
+ return 0
+ if not os.path.isdir(DUNGEON_PATH):
+ logger.error(
+ "Couldn't find dungeon. Set INQUISITOR_DUNGEON or cd to parent folder of ./dungeon"
+ )
+ return -1
+ if not os.path.isdir(SOURCES_PATH):
+ logger.error(
+ "Couldn't find sources. Set INQUISITOR_SOURCES or cd to parent folder of ./sources"
+ )
- # Update sources
- from inquisitor.sources import update_sources
- update_sources(*args.source)
- return 0
+ # Update sources
+ from inquisitor.sources import update_sources
+
+ update_sources(*args.source)
+ return 0
def command_deactivate(args):
- """Deactivate all items in the specified dungeon cells."""
- parser = argparse.ArgumentParser(
- prog="inquisitor deactivate",
- description=command_deactivate.__doc__,
- add_help=False)
- parser.add_argument("source",
- nargs="*",
- help="Cells to deactivate.")
- parser.add_argument("--tag",
- help="Only deactivate items with this tag")
- parser.add_argument("--title",
- help="Only deactivate items with titles containing this substring")
- args = parser.parse_args(args)
+ """Deactivate all items in the specified dungeon cells."""
+ parser = argparse.ArgumentParser(
+ prog="inquisitor deactivate",
+ description=command_deactivate.__doc__,
+ add_help=False,
+ )
+ parser.add_argument("source", nargs="*", help="Cells to deactivate.")
+ parser.add_argument("--tag", help="Only deactivate items with this tag")
+ parser.add_argument(
+ "--title", help="Only deactivate items with titles containing this substring"
+ )
+ args = parser.parse_args(args)
- if len(args.source) == 0:
- parser.print_help()
- return 0
- if not os.path.isdir(DUNGEON_PATH):
- logger.error("Couldn't find dungeon. Set INQUISITOR_DUNGEON or cd to parent folder of ./dungeon")
- return -1
+ if len(args.source) == 0:
+ parser.print_help()
+ return 0
+ if not os.path.isdir(DUNGEON_PATH):
+ logger.error(
+ "Couldn't find dungeon. Set INQUISITOR_DUNGEON or cd to parent folder of ./dungeon"
+ )
+ return -1
- # Deactivate all items in each source.
- from inquisitor.loader import load_items
- for source_name in args.source:
- path = os.path.join(DUNGEON_PATH, source_name)
- if not os.path.isdir(path):
- logger.warning("'{}' is not an extant source".format(source_name))
- count = 0
- items, _ = load_items(source_name)
- for item in items.values():
- if args.tag and args.tag not in item['tags']:
- continue
- if args.title and args.title not in item['title']:
- continue
- if item['active']:
- item['active'] = False
- count += 1
- logger.info("Deactivated {} items in '{}'".format(count, source_name))
+ # Deactivate all items in each source.
+ from inquisitor.loader import load_items
- return 0
+ for source_name in args.source:
+ path = os.path.join(DUNGEON_PATH, source_name)
+ if not os.path.isdir(path):
+ logger.warning("'{}' is not an extant source".format(source_name))
+ count = 0
+ items, _ = load_items(source_name)
+ for item in items.values():
+ if args.tag and args.tag not in item["tags"]:
+ continue
+ if args.title and args.title not in item["title"]:
+ continue
+ if item["active"]:
+ item["active"] = False
+ count += 1
+ logger.info("Deactivated {} items in '{}'".format(count, source_name))
+
+ return 0
def command_add(args):
- """Creates an item."""
- parser = argparse.ArgumentParser(
- prog="inquisitor add",
- description=command_add.__doc__,
- add_help=False)
- parser.add_argument("--id", help="String")
- parser.add_argument("--source", help="String")
- parser.add_argument("--title", help="String")
- parser.add_argument("--link", help="URL")
- parser.add_argument("--time", type=int, help="Unix timestmap")
- parser.add_argument("--author", help="String")
- parser.add_argument("--body", help="HTML")
- parser.add_argument("--tags", help="Comma-separated list")
- parser.add_argument("--ttl", type=int, help="Cleanup protection in seconds")
- parser.add_argument("--ttd", type=int, help="Cleanup force in seconds")
- parser.add_argument("--tts", type=int, help="Display delay in seconds")
- parser.add_argument("--create", action="store_true", help="Create source if it doesn't exist")
- args = parser.parse_args(args)
+ """Creates an item."""
+ parser = argparse.ArgumentParser(
+ prog="inquisitor add", description=command_add.__doc__, add_help=False
+ )
+ parser.add_argument("--id", help="String")
+ parser.add_argument("--source", help="String")
+ parser.add_argument("--title", help="String")
+ parser.add_argument("--link", help="URL")
+ parser.add_argument("--time", type=int, help="Unix timestmap")
+ parser.add_argument("--author", help="String")
+ parser.add_argument("--body", help="HTML")
+ parser.add_argument("--tags", help="Comma-separated list")
+ parser.add_argument("--ttl", type=int, help="Cleanup protection in seconds")
+ parser.add_argument("--ttd", type=int, help="Cleanup force in seconds")
+ parser.add_argument("--tts", type=int, help="Display delay in seconds")
+ parser.add_argument(
+ "--create", action="store_true", help="Create source if it doesn't exist"
+ )
+ args = parser.parse_args(args)
- if not args.title:
- parser.print_help()
- return 0
- if not os.path.isdir(DUNGEON_PATH):
- logger.error("Couldn't find dungeon. Set INQUISITOR_DUNGEON or cd to parent folder of ./dungeon")
- return -1
+ if not args.title:
+ parser.print_help()
+ return 0
+ if not os.path.isdir(DUNGEON_PATH):
+ logger.error(
+ "Couldn't find dungeon. Set INQUISITOR_DUNGEON or cd to parent folder of ./dungeon"
+ )
+ return -1
- source = args.source or 'inquisitor'
- cell_path = os.path.join(DUNGEON_PATH, source)
- if args.create:
- from inquisitor.sources import ensure_cell
- ensure_cell(source)
- elif not os.path.isdir(cell_path):
- logger.error("Source '{}' does not exist".format(source))
- return -1
+ source = args.source or "inquisitor"
+ cell_path = os.path.join(DUNGEON_PATH, source)
+ if args.create:
+ from inquisitor.sources import ensure_cell
- item = {
- 'id': args.id or '{:x}'.format(random.getrandbits(16 * 4)),
- 'source': source,
- }
- if args.title: item['title'] = str(args.title)
- if args.link: item['link'] = str(args.link)
- if args.time: item['time'] = int(args.time)
- if args.author: item['author'] = str(args.author)
- if args.body: item['body'] = str(args.body)
- if args.tags: item['tags'] = [str(tag) for tag in args.tags.split(",")]
- if args.ttl: item['ttl'] = int(args.ttl)
- if args.ttd: item['ttd'] = int(args.ttd)
- if args.tts: item['tts'] = int(args.tts)
+ ensure_cell(source)
+ elif not os.path.isdir(cell_path):
+ logger.error("Source '{}' does not exist".format(source))
+ return -1
- from inquisitor.loader import new_item
- saved_item = new_item(source, item)
- logger.info(saved_item)
+ item = {
+ "id": args.id or "{:x}".format(random.getrandbits(16 * 4)),
+ "source": source,
+ }
+ if args.title:
+ item["title"] = str(args.title)
+ if args.link:
+ item["link"] = str(args.link)
+ if args.time:
+ item["time"] = int(args.time)
+ if args.author:
+ item["author"] = str(args.author)
+ if args.body:
+ item["body"] = str(args.body)
+ if args.tags:
+ item["tags"] = [str(tag) for tag in args.tags.split(",")]
+ if args.ttl:
+ item["ttl"] = int(args.ttl)
+ if args.ttd:
+ item["ttd"] = int(args.ttd)
+ if args.tts:
+ item["tts"] = int(args.tts)
+
+ from inquisitor.loader import new_item
+
+ saved_item = new_item(source, item)
+ logger.info(saved_item)
def command_feed(args):
- """Print the current feed."""
- if not os.path.isdir(DUNGEON_PATH):
- logger.error("Couldn't find dungeon. Set INQUISITOR_DUNGEON or cd to parent folder of ./dungeon")
- return -1
+ """Print the current feed."""
+ if not os.path.isdir(DUNGEON_PATH):
+ logger.error(
+ "Couldn't find dungeon. Set INQUISITOR_DUNGEON or cd to parent folder of ./dungeon"
+ )
+ return -1
- import shutil
- from inquisitor import loader
- from inquisitor import timestamp
+ import shutil
+ from inquisitor import loader
+ from inquisitor import timestamp
- items, errors = loader.load_active_items(source_names=None)
- if not items and not errors:
- print("Feed is empty")
- return 0
+ items, errors = loader.load_active_items(source_names=None)
+ if not items and not errors:
+ print("Feed is empty")
+ return 0
- if errors:
- items.insert(0, {
- 'title': '{} read errors: {}'.format(len(errors), ' '.join(errors)),
- 'body': "\n".join(errors)
- })
+ if errors:
+ items.insert(
+ 0,
+ {
+ "title": "{} read errors: {}".format(len(errors), " ".join(errors)),
+ "body": "\n".join(errors),
+ },
+ )
- size = shutil.get_terminal_size((80, 20))
- width = min(80, size.columns)
+ size = shutil.get_terminal_size((80, 20))
+ width = min(80, size.columns)
- for item in items:
- title = item['title'] if 'title' in item else ""
- titles = [title]
- while len(titles[-1]) > width - 4:
- i = titles[-1][:width - 4].rfind(' ')
- titles = titles[:-1] + [titles[-1][:i].strip(), titles[-1][i:].strip()]
- print('+' + (width - 2) * '-' + '+')
- for title in titles:
- print("| {0:<{1}} |".format(title, width - 4))
- print("|{0:<{1}}|".format("", width - 2))
- info1 = ""
- if 'author' in title and item['author']:
- info1 += item['author'] + " "
- if 'time' in item and item['time']:
- info1 += timestamp.stamp_to_readable(item['time'])
- print("| {0:<{1}} |".format(info1, width - 4))
- created = timestamp.stamp_to_readable(item['created']) if 'created' in item else ""
- info2 = "{0} {1} {2}".format(
- item.get('source', ''), item.get('id', ''), created)
- print("| {0:<{1}} |".format(info2, width - 4))
- print('+' + (width - 2) * '-' + '+')
- print()
+ for item in items:
+ title = item["title"] if "title" in item else ""
+ titles = [title]
+ while len(titles[-1]) > width - 4:
+ i = titles[-1][: width - 4].rfind(" ")
+ titles = titles[:-1] + [titles[-1][:i].strip(), titles[-1][i:].strip()]
+ print("+" + (width - 2) * "-" + "+")
+ for title in titles:
+ print("| {0:<{1}} |".format(title, width - 4))
+ print("|{0:<{1}}|".format("", width - 2))
+ info1 = ""
+ if "author" in title and item["author"]:
+ info1 += item["author"] + " "
+ if "time" in item and item["time"]:
+ info1 += timestamp.stamp_to_readable(item["time"])
+ print("| {0:<{1}} |".format(info1, width - 4))
+ created = (
+ timestamp.stamp_to_readable(item["created"]) if "created" in item else ""
+ )
+ info2 = "{0} {1} {2}".format(
+ item.get("source", ""), item.get("id", ""), created
+ )
+ print("| {0:<{1}} |".format(info2, width - 4))
+ print("+" + (width - 2) * "-" + "+")
+ print()
def command_run(args):
- """Run the default Flask server."""
- parser = argparse.ArgumentParser(
- prog="inquisitor run",
- description=command_run.__doc__,
- add_help=False)
- parser.add_argument("--debug", action="store_true")
- parser.add_argument("--port", type=int, default=5000)
- args = parser.parse_args(args)
+ """Run the default Flask server."""
+ parser = argparse.ArgumentParser(
+ prog="inquisitor run", description=command_run.__doc__, add_help=False
+ )
+ parser.add_argument("--debug", action="store_true")
+ parser.add_argument("--port", type=int, default=5000)
+ args = parser.parse_args(args)
- try:
- from inquisitor.app import app
- app.run(port=args.port, debug=args.debug)
- return 0
- except Exception as e:
- logger.error(e)
- return -1
+ try:
+ from inquisitor.app import app
+
+ app.run(port=args.port, debug=args.debug)
+ return 0
+ except Exception as e:
+ logger.error(e)
+ return -1
def command_help(args):
- """Print this help message and exit."""
- print_usage()
- return 0
+ """Print this help message and exit."""
+ print_usage()
+ return 0
def nocommand(args):
- print("command required")
- return 0
+ print("command required")
+ return 0
def main():
- """CLI entry point"""
- # Enable piping
- from signal import signal, SIGPIPE, SIG_DFL
- signal(SIGPIPE, SIG_DFL)
+ """CLI entry point"""
+ # Enable piping
+ from signal import signal, SIGPIPE, SIG_DFL
- # Collect the commands from this module
- import inquisitor.cli
- commands = {
- name[8:] : func
- for name, func in vars(inquisitor.cli).items()
- if name.startswith('command_')
- }
- descriptions = "\n".join([
- "- {0}: {1}".format(name, func.__doc__)
- for name, func in commands.items()])
+ signal(SIGPIPE, SIG_DFL)
- # Set up the parser
- parser = argparse.ArgumentParser(
- description="Available commands:\n{}\n".format(descriptions),
- formatter_class=argparse.RawDescriptionHelpFormatter,
- add_help=False)
- parser.add_argument("command",
- nargs="?",
- default="help",
- help="The command to execute",
- choices=commands,
- metavar="command")
- parser.add_argument("args",
- nargs=argparse.REMAINDER,
- help="Command arguments",
- metavar="args")
- parser.add_argument("-v",
- action="store_true",
- dest="verbose",
- help="Enable debug logging")
+ # Collect the commands from this module
+ import inquisitor.cli
- # Extract the usage print for command_help
- global print_usage
- print_usage = parser.print_help
+ commands = {
+ name[8:]: func
+ for name, func in vars(inquisitor.cli).items()
+ if name.startswith("command_")
+ }
+ descriptions = "\n".join(
+ ["- {0}: {1}".format(name, func.__doc__) for name, func in commands.items()]
+ )
- args = parser.parse_args()
+ # Set up the parser
+ parser = argparse.ArgumentParser(
+ description="Available commands:\n{}\n".format(descriptions),
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ add_help=False,
+ )
+ parser.add_argument(
+ "command",
+ nargs="?",
+ default="help",
+ help="The command to execute",
+ choices=commands,
+ metavar="command",
+ )
+ parser.add_argument(
+ "args", nargs=argparse.REMAINDER, help="Command arguments", metavar="args"
+ )
+ parser.add_argument(
+ "-v", action="store_true", dest="verbose", help="Enable debug logging"
+ )
- # Initialize a console logger
- add_logging_handler(verbose=args.verbose, log_filename=None)
+ # Extract the usage print for command_help
+ global print_usage
+ print_usage = parser.print_help
- # Execute command
- try:
- command = commands.get(args.command, nocommand)
- sys.exit(command(args.args))
- except BrokenPipeError:
- # See https://docs.python.org/3.10/library/signal.html#note-on-sigpipe
- devnull = os.open(os.devnull, os.O_WRONLY)
- os.dup2(devnull, sys.stdout.fileno())
- sys.exit(1)
+ args = parser.parse_args()
+
+ # Initialize a console logger
+ add_logging_handler(verbose=args.verbose, log_filename=None)
+
+ # Execute command
+ try:
+ command = commands.get(args.command, nocommand)
+ sys.exit(command(args.args))
+ except BrokenPipeError:
+ # See https://docs.python.org/3.10/library/signal.html#note-on-sigpipe
+ devnull = os.open(os.devnull, os.O_WRONLY)
+ os.dup2(devnull, sys.stdout.fileno())
+ sys.exit(1)
diff --git a/inquisitor/configs/__init__.py b/inquisitor/configs/__init__.py
index ee31800..54015e6 100644
--- a/inquisitor/configs/__init__.py
+++ b/inquisitor/configs/__init__.py
@@ -1,10 +1,5 @@
from .resolver import data_path as DUNGEON_PATH
from .resolver import source_path as SOURCES_PATH
from .resolver import cache_path as CACHE_PATH
-from .resolver import (
- logger,
- subfeeds)
-from .resolver import (
- add_logging_handler,
- init_default_logging,
- get_subfeed_overrides)
\ No newline at end of file
+from .resolver import logger, subfeeds
+from .resolver import add_logging_handler, init_default_logging, get_subfeed_overrides
diff --git a/inquisitor/configs/resolver.py b/inquisitor/configs/resolver.py
index fab7f5c..8b2d5a3 100644
--- a/inquisitor/configs/resolver.py
+++ b/inquisitor/configs/resolver.py
@@ -4,174 +4,172 @@ import logging
# Constants governing config resolution:
# Path to the config file, containing key-value pairs of the other settings
-CONFIG_ENVVAR = 'INQUISITOR_CONFIG'
-DEFAULT_CONFIG_PATH = '/etc/inquisitor.conf'
+CONFIG_ENVVAR = "INQUISITOR_CONFIG"
+DEFAULT_CONFIG_PATH = "/etc/inquisitor.conf"
# Path to the folder where items are stored
-CONFIG_DATA = 'DataPath'
-DEFAULT_DATA_PATH = '/var/inquisitor/data/'
+CONFIG_DATA = "DataPath"
+DEFAULT_DATA_PATH = "/var/inquisitor/data/"
# Path to the folder where source modules are stored
-CONFIG_SOURCES = 'SourcePath'
-DEFAULT_SOURCES_PATH = '/var/inquisitor/sources/'
+CONFIG_SOURCES = "SourcePath"
+DEFAULT_SOURCES_PATH = "/var/inquisitor/sources/"
# Path to the folder where cached files are stored
-CONFIG_CACHE = 'CachePath'
-DEFAULT_CACHE_PATH = '/var/inquisitor/cache/'
+CONFIG_CACHE = "CachePath"
+DEFAULT_CACHE_PATH = "/var/inquisitor/cache/"
# Path to a log file where logging will be redirected
-CONFIG_LOGFILE = 'LogFile'
+CONFIG_LOGFILE = "LogFile"
DEFAULT_LOG_FILE = None
# Whether logging is verbose
-CONFIG_VERBOSE = 'Verbose'
-DEFAULT_VERBOSITY = 'false'
+CONFIG_VERBOSE = "Verbose"
+DEFAULT_VERBOSITY = "false"
# Subfeed source lists, with each subfeed config separated by lines and
# sources within a subfeed separated by spaces
-CONFIG_SUBFEEDS = 'Subfeeds'
+CONFIG_SUBFEEDS = "Subfeeds"
DEFAULT_SUBFEEDS = None
-SUBFEED_CONFIG_FILE = 'subfeeds.conf'
+SUBFEED_CONFIG_FILE = "subfeeds.conf"
def read_config_file(config_path):
- """
- Reads a config file of key-value pairs, where non-blank lines are
- either comments beginning with the character '#' or keys and values
- separated by the character '='.
- """
- # Parse the config file into key-value pairs
- if not os.path.isfile(config_path):
+ """
+ Reads a config file of key-value pairs, where non-blank lines are
+ either comments beginning with the character '#' or keys and values
+ separated by the character '='.
+ """
+ # Parse the config file into key-value pairs
+ if not os.path.isfile(config_path):
- raise FileNotFoundError(f'No config file found at {config_path}, try setting {CONFIG_ENVVAR}')
- accumulated_configs = {}
- current_key = None
- with open(config_path, 'r', encoding='utf8') as cfg:
- line_no = 0
- for line in cfg:
- line_no += 1
- # Skip blank lines and comments
- if not line.strip() or line.lstrip().startswith('#'):
- continue
- # Accumulate config keyvalue pairs
- if '=' in line:
- # "key = value" begins a new keyvalue pair
- current_key, value = line.split('=', maxsplit=1)
- current_key = current_key.strip()
- accumulated_configs[current_key] = value.strip()
- else:
- # If there's no '=' and no previous key, throw
- if not current_key:
- raise ValueError(f'Invalid config format on line {line_no}')
- else:
- accumulated_configs[current_key] += '\n' + line.strip()
+ raise FileNotFoundError(
+ f"No config file found at {config_path}, try setting {CONFIG_ENVVAR}"
+ )
+ accumulated_configs = {}
+ current_key = None
+ with open(config_path, "r", encoding="utf8") as cfg:
+ line_no = 0
+ for line in cfg:
+ line_no += 1
+ # Skip blank lines and comments
+ if not line.strip() or line.lstrip().startswith("#"):
+ continue
+ # Accumulate config keyvalue pairs
+ if "=" in line:
+ # "key = value" begins a new keyvalue pair
+ current_key, value = line.split("=", maxsplit=1)
+ current_key = current_key.strip()
+ accumulated_configs[current_key] = value.strip()
+ else:
+ # If there's no '=' and no previous key, throw
+ if not current_key:
+ raise ValueError(f"Invalid config format on line {line_no}")
+ else:
+ accumulated_configs[current_key] += "\n" + line.strip()
- return accumulated_configs
+ return accumulated_configs
def parse_subfeed_value(value):
- sf_defs = [sf.strip() for sf in value.split('\n') if sf.strip()]
- subfeeds = {}
- for sf_def in sf_defs:
- if ':' not in sf_def:
- raise ValueError(f'Invalid subfeed definition: {sf_def}')
- sf_name, sf_sources = sf_def.split(':', maxsplit=1)
- sf_sources = sf_sources.split()
- subfeeds[sf_name.strip()] = [source.strip() for source in sf_sources]
- return subfeeds
+ sf_defs = [sf.strip() for sf in value.split("\n") if sf.strip()]
+ subfeeds = {}
+ for sf_def in sf_defs:
+ if ":" not in sf_def:
+ raise ValueError(f"Invalid subfeed definition: {sf_def}")
+ sf_name, sf_sources = sf_def.split(":", maxsplit=1)
+ sf_sources = sf_sources.split()
+ subfeeds[sf_name.strip()] = [source.strip() for source in sf_sources]
+ return subfeeds
# Read envvar for config file location, with fallback to default
-config_path = os.path.abspath(
- os.environ.get(CONFIG_ENVVAR) or
- DEFAULT_CONFIG_PATH
-)
+config_path = os.path.abspath(os.environ.get(CONFIG_ENVVAR) or DEFAULT_CONFIG_PATH)
configs = read_config_file(config_path)
# Extract and validate config values
data_path = configs.get(CONFIG_DATA) or DEFAULT_DATA_PATH
if not os.path.isabs(data_path):
- raise ValueError(f'Non-absolute data path: {data_path}')
+ raise ValueError(f"Non-absolute data path: {data_path}")
if not os.path.isdir(data_path):
- raise FileNotFoundError(f'Cannot find directory {data_path}')
+ raise FileNotFoundError(f"Cannot find directory {data_path}")
source_path = configs.get(CONFIG_SOURCES) or DEFAULT_SOURCES_PATH
if not os.path.isabs(source_path):
- raise ValueError(f'Non-absolute source path: {source_path}')
+ raise ValueError(f"Non-absolute source path: {source_path}")
if not os.path.isdir(source_path):
- raise FileNotFoundError(f'Cannot find directory {source_path}')
+ raise FileNotFoundError(f"Cannot find directory {source_path}")
cache_path = configs.get(CONFIG_CACHE) or DEFAULT_CACHE_PATH
if not os.path.isabs(cache_path):
- raise ValueError(f'Non-absolute cache path: {cache_path}')
+ raise ValueError(f"Non-absolute cache path: {cache_path}")
if not os.path.isdir(cache_path):
- raise FileNotFoundError(f'Cannot find directory {cache_path}')
+ raise FileNotFoundError(f"Cannot find directory {cache_path}")
log_file = configs.get(CONFIG_LOGFILE) or DEFAULT_LOG_FILE
if log_file and not os.path.isabs(log_file):
- raise ValueError(f'Non-absolute log file path: {log_file}')
+ raise ValueError(f"Non-absolute log file path: {log_file}")
is_verbose = configs.get(CONFIG_VERBOSE) or DEFAULT_VERBOSITY
-if is_verbose != 'true' and is_verbose != 'false':
- raise ValueError(f'Invalid verbose value (must be "true" or "false"): {is_verbose}')
-is_verbose = (is_verbose == 'true')
+if is_verbose != "true" and is_verbose != "false":
+ raise ValueError(f'Invalid verbose value (must be "true" or "false"): {is_verbose}')
+is_verbose = is_verbose == "true"
subfeeds = configs.get(CONFIG_SUBFEEDS) or DEFAULT_SUBFEEDS
if subfeeds:
- subfeeds = parse_subfeed_value(subfeeds)
+ subfeeds = parse_subfeed_value(subfeeds)
def get_subfeed_overrides():
- """
- Check for and parse the secondary subfeed configuration file
- """
- path = os.path.join(source_path, SUBFEED_CONFIG_FILE)
- if not os.path.isfile(path):
- return None
- overrides = read_config_file(path)
- if CONFIG_SUBFEEDS not in overrides:
- return None
- value = overrides[CONFIG_SUBFEEDS]
- if not value:
- return None
- parsed_value = parse_subfeed_value(value)
- return parsed_value
+ """
+ Check for and parse the secondary subfeed configuration file
+ """
+ path = os.path.join(source_path, SUBFEED_CONFIG_FILE)
+ if not os.path.isfile(path):
+ return None
+ overrides = read_config_file(path)
+ if CONFIG_SUBFEEDS not in overrides:
+ return None
+ value = overrides[CONFIG_SUBFEEDS]
+ if not value:
+ return None
+ parsed_value = parse_subfeed_value(value)
+ return parsed_value
# Set up logging
logger = logging.getLogger("inquisitor")
logger.setLevel(logging.DEBUG)
+
def add_logging_handler(verbose, log_filename):
- """
- Adds a logging handler according to the given settings
- """
- log_format = (
- '[{asctime}] [{levelname}:{filename}:{lineno}] {message}'
- if verbose else
- '[{levelname}] {message}'
- )
- formatter = logging.Formatter(log_format, style='{')
+ """
+ Adds a logging handler according to the given settings
+ """
+ log_format = (
+ "[{asctime}] [{levelname}:{filename}:{lineno}] {message}"
+ if verbose
+ else "[{levelname}] {message}"
+ )
+ formatter = logging.Formatter(log_format, style="{")
- log_level = (
- logging.DEBUG
- if verbose else
- logging.INFO
- )
- handler = (
- logging.handlers.RotatingFileHandler(
- log_filename,
- encoding='utf8',
- maxBytes=2**22, # 4 MB per log file
- backupCount=4) # 16 MB total
- if log_filename else
- logging.StreamHandler()
- )
- handler.setFormatter(formatter)
- handler.setLevel(log_level)
+ log_level = logging.DEBUG if verbose else logging.INFO
+ handler = (
+ logging.handlers.RotatingFileHandler(
+ log_filename,
+ encoding="utf8",
+ maxBytes=2**22, # 4 MB per log file
+ backupCount=4,
+ ) # 16 MB total
+ if log_filename
+ else logging.StreamHandler()
+ )
+ handler.setFormatter(formatter)
+ handler.setLevel(log_level)
+
+ logger.addHandler(handler)
- logger.addHandler(handler)
def init_default_logging():
- add_logging_handler(is_verbose, log_file)
+ add_logging_handler(is_verbose, log_file)
diff --git a/inquisitor/error.py b/inquisitor/error.py
index 0270e4a..94341ac 100644
--- a/inquisitor/error.py
+++ b/inquisitor/error.py
@@ -8,19 +8,20 @@ from inquisitor.configs import DUNGEON_PATH, logger
logger = logging.getLogger("inquisitor")
+
def as_item(title, body=None):
- iid = '{:x}'.format(random.getrandbits(16 * 4))
- item = {
- 'id': iid,
- 'source': 'inquisitor',
- 'title': title,
- 'active': True,
- 'created': timestamp.now(),
- 'tags': ['inquisitor', 'error'],
- }
- if body is not None:
- item['body'] = '{}
'.format(body)
- path = os.path.join(DUNGEON_PATH, 'inquisitor', iid + ".item")
- logger.error(json.dumps(item))
- with open(path, 'w') as f:
- f.write(json.dumps(item, indent=2))
+ iid = "{:x}".format(random.getrandbits(16 * 4))
+ item = {
+ "id": iid,
+ "source": "inquisitor",
+ "title": title,
+ "active": True,
+ "created": timestamp.now(),
+ "tags": ["inquisitor", "error"],
+ }
+ if body is not None:
+ item["body"] = "{}
".format(body)
+ path = os.path.join(DUNGEON_PATH, "inquisitor", iid + ".item")
+ logger.error(json.dumps(item))
+ with open(path, "w") as f:
+ f.write(json.dumps(item, indent=2))
diff --git a/inquisitor/loader.py b/inquisitor/loader.py
index 414620c..0632c5d 100644
--- a/inquisitor/loader.py
+++ b/inquisitor/loader.py
@@ -7,180 +7,180 @@ from inquisitor import error
from inquisitor import timestamp
-class WritethroughDict():
- """A wrapper for a dictionary saved to the file system."""
+class WritethroughDict:
+ """A wrapper for a dictionary saved to the file system."""
- @staticmethod
- def create(path, item):
- """
- Creates a writethrough dictionary from a dictionary in memory and
- initializes a file to save it.
- """
- if os.path.isfile(path):
- raise FileExistsError(path)
- wd = WritethroughDict(path, item)
- wd.flush()
- return wd
+ @staticmethod
+ def create(path, item):
+ """
+ Creates a writethrough dictionary from a dictionary in memory and
+ initializes a file to save it.
+ """
+ if os.path.isfile(path):
+ raise FileExistsError(path)
+ wd = WritethroughDict(path, item)
+ wd.flush()
+ return wd
- @staticmethod
- def load(path):
- """
- Creates a writethrough dictionary from an existing file in the
- file system.
- """
- if not os.path.isfile(path):
- raise FileNotFoundError(path)
- with open(path) as f:
- item = json.load(f)
- return WritethroughDict(path, item)
+ @staticmethod
+ def load(path):
+ """
+ Creates a writethrough dictionary from an existing file in the
+ file system.
+ """
+ if not os.path.isfile(path):
+ raise FileNotFoundError(path)
+ with open(path) as f:
+ item = json.load(f)
+ return WritethroughDict(path, item)
- def __init__(self, path, item):
- self.path = path
- self.item = item
+ def __init__(self, path, item):
+ self.path = path
+ self.item = item
- def __getitem__(self, key):
- return self.item[key]
+ def __getitem__(self, key):
+ return self.item[key]
- def get(self, *args, **kwargs):
- return self.item.get(*args, **kwargs)
+ def get(self, *args, **kwargs):
+ return self.item.get(*args, **kwargs)
- def __setitem__(self, key, value):
- self.item[key] = value
- self.flush()
+ def __setitem__(self, key, value):
+ self.item[key] = value
+ self.flush()
- def __contains__(self, key):
- return key in self.item
+ def __contains__(self, key):
+ return key in self.item
- def __repr__(self):
- return repr(self.item)
+ def __repr__(self):
+ return repr(self.item)
- def __str__(self):
- return str(self.item)
+ def __str__(self):
+ return str(self.item)
- def flush(self):
- s = json.dumps(self.item, indent=2)
- with open(self.path, 'w', encoding="utf8") as f:
- f.write(s)
+ def flush(self):
+ s = json.dumps(self.item, indent=2)
+ with open(self.path, "w", encoding="utf8") as f:
+ f.write(s)
def load_state(source_name):
- """Loads the state dictionary for a source."""
- state_path = os.path.join(DUNGEON_PATH, source_name, "state")
- return WritethroughDict.load(state_path)
+ """Loads the state dictionary for a source."""
+ state_path = os.path.join(DUNGEON_PATH, source_name, "state")
+ return WritethroughDict.load(state_path)
def load_item(source_name, item_id):
- """Loads an item from a source."""
- item_path = os.path.join(DUNGEON_PATH, source_name, f'{item_id}.item')
- return WritethroughDict.load(item_path)
+ """Loads an item from a source."""
+ item_path = os.path.join(DUNGEON_PATH, source_name, f"{item_id}.item")
+ return WritethroughDict.load(item_path)
def item_exists(source_name, item_id):
- """
- Checks for the existence of an item.
- """
- item_path = os.path.join(DUNGEON_PATH, source_name, f'{item_id}.item')
- return os.path.isfile(item_path)
+ """
+ Checks for the existence of an item.
+ """
+ item_path = os.path.join(DUNGEON_PATH, source_name, f"{item_id}.item")
+ return os.path.isfile(item_path)
def get_item_ids(cell_name):
- """
- Returns a list of item ids in the given cell.
- """
- cell_path = os.path.join(DUNGEON_PATH, cell_name)
- return [
- filename[:-5]
- for filename in os.listdir(cell_path)
- if filename.endswith('.item')
- ]
+ """
+ Returns a list of item ids in the given cell.
+ """
+ cell_path = os.path.join(DUNGEON_PATH, cell_name)
+ return [
+ filename[:-5]
+ for filename in os.listdir(cell_path)
+ if filename.endswith(".item")
+ ]
def new_item(source_name, item):
- """
- Creates a new item with the fields in the provided dictionary.
- Initializes other fields to their default values.
- """
- # id is required
- if 'id' not in item:
- raise Exception(f'Cannot create item with no id. Value = {item}')
+ """
+ Creates a new item with the fields in the provided dictionary.
+ Initializes other fields to their default values.
+ """
+ # id is required
+ if "id" not in item:
+ raise Exception(f"Cannot create item with no id. Value = {item}")
- # source must be filled in, so if it is absent it is auto-populated with
- # source_name. Note: this allows sources to fill in a different source.
- if 'source' not in item:
- item['source'] = source_name
+ # source must be filled in, so if it is absent it is auto-populated with
+ # source_name. Note: this allows sources to fill in a different source.
+ if "source" not in item:
+ item["source"] = source_name
- # active is forced to True for new items
- item['active'] = True
+ # active is forced to True for new items
+ item["active"] = True
- # created is forced to the current timestamp
- item['created'] = timestamp.now()
+ # created is forced to the current timestamp
+ item["created"] = timestamp.now()
- # title is auto-populated with the id if missing
- if 'title' not in item:
- item['title'] = item['id']
+ # title is auto-populated with the id if missing
+ if "title" not in item:
+ item["title"] = item["id"]
- # tags is auto-populated if missing (not if empty!)
- if 'tags' not in item:
- item['tags'] = [source_name]
+ # tags is auto-populated if missing (not if empty!)
+ if "tags" not in item:
+ item["tags"] = [source_name]
- # All other fields are optional.
- item_path = os.path.join(DUNGEON_PATH, item['source'], f'{item["id"]}.item')
- return WritethroughDict.create(item_path, item)
+ # All other fields are optional.
+ item_path = os.path.join(DUNGEON_PATH, item["source"], f'{item["id"]}.item')
+ return WritethroughDict.create(item_path, item)
def delete_item(source_name, item_id):
- """
- Delete an item.
- """
- item_path = os.path.join(DUNGEON_PATH, source_name, f'{item_id}.item')
- os.remove(item_path)
+ """
+ Delete an item.
+ """
+ item_path = os.path.join(DUNGEON_PATH, source_name, f"{item_id}.item")
+ os.remove(item_path)
def load_items(source_name):
- """
- Returns a map of ids to items and a list of unreadable files.
- """
- cell_path = os.path.join(DUNGEON_PATH, source_name)
- items = {}
- errors = []
- for filename in os.listdir(cell_path):
- if filename.endswith('.item'):
- try:
- item = load_item(source_name, filename[:-5])
- items[item['id']] = item
- except Exception:
- errors.append(filename)
- return items, errors
+ """
+ Returns a map of ids to items and a list of unreadable files.
+ """
+ cell_path = os.path.join(DUNGEON_PATH, source_name)
+ items = {}
+ errors = []
+ for filename in os.listdir(cell_path):
+ if filename.endswith(".item"):
+ try:
+ item = load_item(source_name, filename[:-5])
+ items[item["id"]] = item
+ except Exception:
+ errors.append(filename)
+ return items, errors
def load_active_items(source_names):
- """
- Returns a list of active items and a list of unreadable items. If
- `source_names` is defined, load only from sources in that list.
- """
- items = []
- errors = []
- now = timestamp.now()
- check_list = source_names or os.listdir(DUNGEON_PATH)
- for source_name in check_list:
- source_path = os.path.join(DUNGEON_PATH, source_name)
- if not os.path.isdir(source_path):
- logger.warning(f'Skipping nonexistent source {source_name}')
- continue
- for filename in os.listdir(source_path):
- if not filename.endswith('.item'):
- continue
- try:
- item = load_item(source_name, filename[:-5])
- # The time-to-show field hides items until an expiry date.
- if 'tts' in item:
- tts_date = item['created'] + item['tts']
- if now < tts_date:
- continue
- # Don't show inactive items
- if not item['active']:
- continue
- items.append(item)
- except Exception:
- errors.append(filename)
- return items, errors
+ """
+ Returns a list of active items and a list of unreadable items. If
+ `source_names` is defined, load only from sources in that list.
+ """
+ items = []
+ errors = []
+ now = timestamp.now()
+ check_list = source_names or os.listdir(DUNGEON_PATH)
+ for source_name in check_list:
+ source_path = os.path.join(DUNGEON_PATH, source_name)
+ if not os.path.isdir(source_path):
+ logger.warning(f"Skipping nonexistent source {source_name}")
+ continue
+ for filename in os.listdir(source_path):
+ if not filename.endswith(".item"):
+ continue
+ try:
+ item = load_item(source_name, filename[:-5])
+ # The time-to-show field hides items until an expiry date.
+ if "tts" in item:
+ tts_date = item["created"] + item["tts"]
+ if now < tts_date:
+ continue
+ # Don't show inactive items
+ if not item["active"]:
+ continue
+ items.append(item)
+ except Exception:
+ errors.append(filename)
+ return items, errors
diff --git a/inquisitor/sources.py b/inquisitor/sources.py
index dd45f7a..0a25f1d 100644
--- a/inquisitor/sources.py
+++ b/inquisitor/sources.py
@@ -10,229 +10,234 @@ from inquisitor.configs import SOURCES_PATH, DUNGEON_PATH, logger
USE_NEWEST = (
- 'title',
- 'tags',
- 'link',
- 'time'
- 'author',
- 'body',
- 'ttl',
- 'ttd',
- 'tts',
+ "title",
+ "tags",
+ "link",
+ "time" "author",
+ "body",
+ "ttl",
+ "ttd",
+ "tts",
)
class InquisitorStubSource:
- """A dummy source-like object for clearing out ad-hoc inquisitor items"""
- def fetch_new(self, state):
- return []
+ """A dummy source-like object for clearing out ad-hoc inquisitor items"""
+
+ def fetch_new(self, state):
+ return []
def ensure_cell(name):
- """
- Creates a cell in the dungeon. Idempotent.
- """
- cell_path = os.path.join(DUNGEON_PATH, name)
- if not os.path.isdir(cell_path):
- logger.info(f'Creating cell for source "{name}"')
- os.mkdir(cell_path)
- state_path = os.path.join(cell_path, 'state')
- if not os.path.isfile(state_path):
- with open(state_path, 'w', encoding='utf8') as state:
- json.dump({}, state)
+ """
+ Creates a cell in the dungeon. Idempotent.
+ """
+ cell_path = os.path.join(DUNGEON_PATH, name)
+ if not os.path.isdir(cell_path):
+ logger.info(f'Creating cell for source "{name}"')
+ os.mkdir(cell_path)
+ state_path = os.path.join(cell_path, "state")
+ if not os.path.isfile(state_path):
+ with open(state_path, "w", encoding="utf8") as state:
+ json.dump({}, state)
def update_sources(*source_names):
- """
- Attempts to update each given source.
- """
- for source_name in source_names:
- # Import the source
- try:
- source_module = load_source(source_name)
- except Exception:
- error.as_item(
- f'Error importing source "{source_name}"',
- traceback.format_exc())
- continue
+ """
+ Attempts to update each given source.
+ """
+ for source_name in source_names:
+ # Import the source
+ try:
+ source_module = load_source(source_name)
+ except Exception:
+ error.as_item(
+ f'Error importing source "{source_name}"', traceback.format_exc()
+ )
+ continue
- # If it doesn't have a cell yet, create one
- try:
- ensure_cell(source_name)
- except Exception:
- error.as_item(
- f'Error initializing source "{source_name}"',
- traceback.format_exc())
- continue
+ # If it doesn't have a cell yet, create one
+ try:
+ ensure_cell(source_name)
+ except Exception:
+ error.as_item(
+ f'Error initializing source "{source_name}"', traceback.format_exc()
+ )
+ continue
- # Update the source
- try:
- logger.info(f'Updating source "{source_name}"')
- update_source(source_name, source_module)
- except Exception:
- error.as_item(
- f'Error updating source "{source_name}"',
- traceback.format_exc())
+ # Update the source
+ try:
+ logger.info(f'Updating source "{source_name}"')
+ update_source(source_name, source_module)
+ except Exception:
+ error.as_item(
+ f'Error updating source "{source_name}"', traceback.format_exc()
+ )
def load_source(source_name):
- """
- Attempts to load the source module with the given name.
- Raises an exception on failure.
- """
- if source_name == 'inquisitor':
- return InquisitorStubSource()
+ """
+ Attempts to load the source module with the given name.
+ Raises an exception on failure.
+ """
+ if source_name == "inquisitor":
+ return InquisitorStubSource()
- cwd = os.getcwd()
- try:
- # Push the sources directory.
- os.chdir(SOURCES_PATH)
- # Make the sources directory importable while working with sources.
- if SOURCES_PATH not in sys.path:
- sys.path.insert(0, SOURCES_PATH)
+ cwd = os.getcwd()
+ try:
+ # Push the sources directory.
+ os.chdir(SOURCES_PATH)
+ # Make the sources directory importable while working with sources.
+ if SOURCES_PATH not in sys.path:
+ sys.path.insert(0, SOURCES_PATH)
- # Check if the named source is present.
- source_file_name = source_name + '.py'
- if not os.path.isfile(source_file_name):
- raise FileNotFoundError(f'Missing "{source_name}" in "{SOURCES_PATH}"')
+ # Check if the named source is present.
+ source_file_name = source_name + ".py"
+ if not os.path.isfile(source_file_name):
+ raise FileNotFoundError(f'Missing "{source_name}" in "{SOURCES_PATH}"')
- # Import the source module by file path.
- logger.debug(f'Loading module "{source_file_name}"')
- spec = importlib.util.spec_from_file_location(source_name, source_file_name)
- itemsource = importlib.util.module_from_spec(spec)
- spec.loader.exec_module(itemsource)
- itemsource = importlib.import_module(source_name)
+ # Import the source module by file path.
+ logger.debug(f'Loading module "{source_file_name}"')
+ spec = importlib.util.spec_from_file_location(source_name, source_file_name)
+ itemsource = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(itemsource)
+ itemsource = importlib.import_module(source_name)
- # Require fetch_new().
- if not hasattr(itemsource, 'fetch_new'):
- raise ImportError(f'Missing fetch_new in "{source_file_name}"')
+ # Require fetch_new().
+ if not hasattr(itemsource, "fetch_new"):
+ raise ImportError(f'Missing fetch_new in "{source_file_name}"')
- return itemsource
+ return itemsource
- finally:
- os.chdir(cwd)
- if SOURCES_PATH in sys.path:
- sys.path.remove(SOURCES_PATH)
+ finally:
+ os.chdir(cwd)
+ if SOURCES_PATH in sys.path:
+ sys.path.remove(SOURCES_PATH)
def update_source(source_name, source):
- """
- Attempts to update the given source. Raises an exception if the source does.
- """
- # Get a list of item ids that already existed in this source's cell.
- prior_ids = loader.get_item_ids(source_name)
- logger.debug(f'Found {len(prior_ids)} prior items')
+ """
+ Attempts to update the given source. Raises an exception if the source does.
+ """
+ # Get a list of item ids that already existed in this source's cell.
+ prior_ids = loader.get_item_ids(source_name)
+ logger.debug(f"Found {len(prior_ids)} prior items")
- # Get the feed items from the source's fetch method.
- state = loader.load_state(source_name)
- fetched = source.fetch_new(state)
- state.flush()
- logger.debug(f'Fetched {len(fetched)} items')
- fetched_items = {item['id']: item for item in fetched}
+ # Get the feed items from the source's fetch method.
+ state = loader.load_state(source_name)
+ fetched = source.fetch_new(state)
+ state.flush()
+ logger.debug(f"Fetched {len(fetched)} items")
+ fetched_items = {item["id"]: item for item in fetched}
- # Determine which items are new and which are updates.
- # We query the file system here instead of checking against this source's
- # item ids from above because sources are allowed to generate in other
- # sources' cells.
- new_items = []
- updated_items = []
- for item in fetched:
- item_source = item.get('source', source_name)
- if loader.item_exists(item_source, item['id']):
- updated_items.append(item)
- else:
- new_items.append(item)
+ # Determine which items are new and which are updates.
+ # We query the file system here instead of checking against this source's
+ # item ids from above because sources are allowed to generate in other
+ # sources' cells.
+ new_items = []
+ updated_items = []
+ for item in fetched:
+ item_source = item.get("source", source_name)
+ if loader.item_exists(item_source, item["id"]):
+ updated_items.append(item)
+ else:
+ new_items.append(item)
- # Write all the new items to the source's cell.
- has_create_handler = hasattr(source, 'on_create')
- for item in new_items:
- item_source = item.get('source', source_name)
- created_item = loader.new_item(item_source, item)
- if has_create_handler:
- # Because some sources do not return items more than once,
- # exceptions in the on-create handler must be squashed.
- try:
- source.on_create(state, created_item)
- except:
- error.as_item(
- f'Exception in {source_name}.on_create',
- traceback.format_exc())
+ # Write all the new items to the source's cell.
+ has_create_handler = hasattr(source, "on_create")
+ for item in new_items:
+ item_source = item.get("source", source_name)
+ created_item = loader.new_item(item_source, item)
+ if has_create_handler:
+ # Because some sources do not return items more than once,
+ # exceptions in the on-create handler must be squashed.
+ try:
+ source.on_create(state, created_item)
+ except:
+ error.as_item(
+ f"Exception in {source_name}.on_create", traceback.format_exc()
+ )
- # Update the other items using the fetched items' values.
- for new_item in updated_items:
- old_item = loader.load_item(new_item['source'], new_item['id'])
- for field in USE_NEWEST:
- if field in new_item and old_item[field] != new_item[field]:
- old_item[field] = new_item[field]
- if 'callback' in new_item:
- old_callback = old_item.get('callback', {})
- # Because of the way this update happens, any fields that are set
- # in the callback when the item is new will keep their original
- # values, as those values reappear in new_item on subsequent
- # updates.
- old_item['callback'] = {**old_item['callback'], **new_item['callback']}
+ # Update the other items using the fetched items' values.
+ for new_item in updated_items:
+ old_item = loader.load_item(new_item["source"], new_item["id"])
+ for field in USE_NEWEST:
+ if field in new_item and old_item[field] != new_item[field]:
+ old_item[field] = new_item[field]
+ if "callback" in new_item:
+ old_callback = old_item.get("callback", {})
+ # Because of the way this update happens, any fields that are set
+ # in the callback when the item is new will keep their original
+ # values, as those values reappear in new_item on subsequent
+ # updates.
+ old_item["callback"] = {**old_item["callback"], **new_item["callback"]}
- # In general, items are removed when they are old (not found in the last
- # fetch) and inactive. Some item fields can change this basic behavior.
- del_count = 0
- now = timestamp.now()
- has_delete_handler = hasattr(source, 'on_delete')
- fetched_ids = [item['id'] for item in updated_items]
- old_item_ids = [
- item_id for item_id in prior_ids
- if item_id not in fetched_ids]
- for item_id in old_item_ids:
- item = loader.load_item(source_name, item_id)
- remove = not item['active']
- # The time-to-live field protects an item from removal until expiry.
- # This is mainly used to avoid old items resurfacing when their source
- # cannot guarantee monotonicity.
- if 'ttl' in item:
- ttl_date = item['created'] + item['ttl']
- if ttl_date > now:
- continue
- # The time-to-die field can force an active item to be removed.
- if 'ttd' in item:
- ttd_date = item['created'] + item['ttd']
- if ttd_date < now:
- remove = True
- # Items to be removed are deleted
- if remove:
- try:
- if has_delete_handler:
- # Run the delete handler so exceptions prevent deletions
- source.on_delete(state, item)
- loader.delete_item(source_name, item['id'])
- del_count += 1
- except:
- error.as_item(
- f'Failed to delete {source_name}/{item["id"]}',
- traceback.format_exc())
+ # In general, items are removed when they are old (not found in the last
+ # fetch) and inactive. Some item fields can change this basic behavior.
+ del_count = 0
+ now = timestamp.now()
+ has_delete_handler = hasattr(source, "on_delete")
+ fetched_ids = [item["id"] for item in updated_items]
+ old_item_ids = [item_id for item_id in prior_ids if item_id not in fetched_ids]
+ for item_id in old_item_ids:
+ item = loader.load_item(source_name, item_id)
+ remove = not item["active"]
+ # The time-to-live field protects an item from removal until expiry.
+ # This is mainly used to avoid old items resurfacing when their source
+ # cannot guarantee monotonicity.
+ if "ttl" in item:
+ ttl_date = item["created"] + item["ttl"]
+ if ttl_date > now:
+ continue
+ # The time-to-die field can force an active item to be removed.
+ if "ttd" in item:
+ ttd_date = item["created"] + item["ttd"]
+ if ttd_date < now:
+ remove = True
+ # Items to be removed are deleted
+ if remove:
+ try:
+ if has_delete_handler:
+ # Run the delete handler so exceptions prevent deletions
+ source.on_delete(state, item)
+ loader.delete_item(source_name, item["id"])
+ del_count += 1
+ except:
+ error.as_item(
+ f'Failed to delete {source_name}/{item["id"]}',
+ traceback.format_exc(),
+ )
- # Note update timestamp in state
- state['last_updated'] = timestamp.now()
+ # Note update timestamp in state
+ state["last_updated"] = timestamp.now()
- # Log counts
- logger.info("{} new item{}, {} deleted item{}".format(
- len(new_items), "s" if len(new_items) != 1 else "",
- del_count, "s" if del_count != 1 else ""))
+ # Log counts
+ logger.info(
+ "{} new item{}, {} deleted item{}".format(
+ len(new_items),
+ "s" if len(new_items) != 1 else "",
+ del_count,
+ "s" if del_count != 1 else "",
+ )
+ )
def item_callback(source_name, itemid):
- try:
- # Load the module with the callback function
- source_module = load_source(source_name)
- if not hasattr(source_module, 'callback'):
- raise ImportError(f"Missing callback in '{source_name}'")
- # Load the source state and the origin item
- state = loader.load_state(source_name)
- item = loader.load_item(source_name, itemid)
- # Execute callback
- source_module.callback(state, item)
- # Save any changes
- item.flush()
- state.flush()
- except Exception:
- error.as_item(
- f"Error executing callback for {source_name}/{itemid}",
- traceback.format_exc())
+ try:
+ # Load the module with the callback function
+ source_module = load_source(source_name)
+ if not hasattr(source_module, "callback"):
+ raise ImportError(f"Missing callback in '{source_name}'")
+ # Load the source state and the origin item
+ state = loader.load_state(source_name)
+ item = loader.load_item(source_name, itemid)
+ # Execute callback
+ source_module.callback(state, item)
+ # Save any changes
+ item.flush()
+ state.flush()
+ except Exception:
+ error.as_item(
+ f"Error executing callback for {source_name}/{itemid}",
+ traceback.format_exc(),
+ )
diff --git a/inquisitor/templates.py b/inquisitor/templates.py
index bd30102..a3ce408 100644
--- a/inquisitor/templates.py
+++ b/inquisitor/templates.py
@@ -17,212 +17,221 @@ import requests
# Module imports
from inquisitor import CACHE_PATH
-logger = logging.getLogger('inquisitor.templates')
+logger = logging.getLogger("inquisitor.templates")
def cache_image(source, url, filename):
- # Define some paths
- path = os.path.join(CACHE_PATH, source)
- file_path = os.path.join(path, filename)
- cached_url = f'/cache/{source}/{filename}'
- # Ensure cache folder
- if not os.path.isdir(path):
- os.mkdir(path)
- # Fetch url
- logger.info(f'Caching {url} to {file_path}')
- response = requests.get(url)
- # Write file to disk
- with open(file_path, 'wb') as f:
- f.write(response.content)
- # Return the inquisitor path to the file
- return cached_url
+ # Define some paths
+ path = os.path.join(CACHE_PATH, source)
+ file_path = os.path.join(path, filename)
+ cached_url = f"/cache/{source}/{filename}"
+ # Ensure cache folder
+ if not os.path.isdir(path):
+ os.mkdir(path)
+ # Fetch url
+ logger.info(f"Caching {url} to {file_path}")
+ response = requests.get(url)
+ # Write file to disk
+ with open(file_path, "wb") as f:
+ f.write(response.content)
+ # Return the inquisitor path to the file
+ return cached_url
class LinearCrawler:
- """
- An engine for generating items from web sources that link content
- together in a linear fashion, such as webcomics.
- """
- def fetch_new(self, state):
- items = []
- max_iter = self.max_iterations() - 1
- new = self.try_fetch(state)
- items.extend(new)
- for iter in range(max_iter):
- sleep(1)
- # If we've already gotten some items out of this fetch, we don't
- # want to lose them and have the state still be set to the next
- # page, so we wrap further calls in a try block and force return
- # if we hit an error.
- try:
- new = self.try_fetch(state)
- except:
- new = []
- items.extend(new)
- # Cut out early if there was nothing returned
- if not new:
- break
- return items
+ """
+ An engine for generating items from web sources that link content
+ together in a linear fashion, such as webcomics.
+ """
- def try_fetch(self, state):
- # Check for whether a new page should be crawled
- if 'current_page' not in state:
- next_page = self.get_start_url()
- else:
- current = state['current_page']
- response = requests.get(current)
- soup = BeautifulSoup(response.text, features='html.parser')
- next_page = self.get_next_page_url(current, soup)
- if not next_page:
- return [] # nothing new
+ def fetch_new(self, state):
+ items = []
+ max_iter = self.max_iterations() - 1
+ new = self.try_fetch(state)
+ items.extend(new)
+ for iter in range(max_iter):
+ sleep(1)
+ # If we've already gotten some items out of this fetch, we don't
+ # want to lose them and have the state still be set to the next
+ # page, so we wrap further calls in a try block and force return
+ # if we hit an error.
+ try:
+ new = self.try_fetch(state)
+ except:
+ new = []
+ items.extend(new)
+ # Cut out early if there was nothing returned
+ if not new:
+ break
+ return items
- # Download the new page
- logger.info('Fetching ' + next_page)
- response = requests.get(next_page)
- soup = BeautifulSoup(response.text, features="html.parser")
+ def try_fetch(self, state):
+ # Check for whether a new page should be crawled
+ if "current_page" not in state:
+ next_page = self.get_start_url()
+ else:
+ current = state["current_page"]
+ response = requests.get(current)
+ soup = BeautifulSoup(response.text, features="html.parser")
+ next_page = self.get_next_page_url(current, soup)
+ if not next_page:
+ return [] # nothing new
- # Create an item from the page
- item = self.make_item(next_page, soup)
+ # Download the new page
+ logger.info("Fetching " + next_page)
+ response = requests.get(next_page)
+ soup = BeautifulSoup(response.text, features="html.parser")
- # Update the state and return the item
- state['current_page'] = next_page
- return [item]
+ # Create an item from the page
+ item = self.make_item(next_page, soup)
- def max_iterations(self):
- return 3
+ # Update the state and return the item
+ state["current_page"] = next_page
+ return [item]
- def get_start_url(self):
- raise NotImplementedError('get_start_url is required')
+ def max_iterations(self):
+ return 3
- def get_next_page_url(self, url, soup):
- raise NotImplementedError('get_next_page_url is required')
+ def get_start_url(self):
+ raise NotImplementedError("get_start_url is required")
- def make_item(self, url, soup):
- raise NotImplementedError('make_item is required')
+ def get_next_page_url(self, url, soup):
+ raise NotImplementedError("get_next_page_url is required")
+
+ def make_item(self, url, soup):
+ raise NotImplementedError("make_item is required")
class RedditScraper:
- """
- An engine for generating items from subreddits.
- Requires defining source, subreddit_name
- fetch new with RedditScraper.fetch_new(state, __name__, reddit)
- """
- @staticmethod
- def fetch_new(state, name, reddit):
- items = []
- for name, obj in inspect.getmembers(sys.modules[name]):
- if (inspect.isclass(obj)
- and issubclass(obj, RedditScraper)
- and obj is not RedditScraper
- ):
- sub_items = obj(reddit).get_items()
- items.extend(sub_items)
- return items
+ """
+ An engine for generating items from subreddits.
+ Requires defining source, subreddit_name
+ fetch new with RedditScraper.fetch_new(state, __name__, reddit)
+ """
- def __init__(self, reddit):
- self.reddit = reddit
+ @staticmethod
+ def fetch_new(state, name, reddit):
+ items = []
+ for name, obj in inspect.getmembers(sys.modules[name]):
+ if (
+ inspect.isclass(obj)
+ and issubclass(obj, RedditScraper)
+ and obj is not RedditScraper
+ ):
+ sub_items = obj(reddit).get_items()
+ items.extend(sub_items)
+ return items
- def get_items(self):
- sub_name = self.subreddit_name
- logger.info(f'Fetching posts from r/{sub_name}')
- subreddit = self.reddit.subreddit(sub_name)
- posts = self.subreddit_page(subreddit)
- items = []
- for post in posts:
- if self.filter_post(post):
- items.append(self.item_from_post(post))
- return items
+ def __init__(self, reddit):
+ self.reddit = reddit
- def item_from_post(self, post):
- item = {
- 'source': self.source,
- 'id': post.id,
- 'title': self.get_title(post),
- 'link': self.get_link(post),
- 'time': post.created_utc,
- 'author': '/u/' + (post.author.name if post.author else "[deleted]"),
- 'body': self.get_body(post),
- 'tags': self.get_tags(post),
- 'ttl': self.get_ttl(post),
- }
- ttl = self.get_ttl(post)
- if ttl is not None: item['ttl'] = ttl
- ttd = self.get_ttd(post)
- if ttd is not None: item['ttd'] = ttd
- tts = self.get_tts(post)
- if tts is not None: item['tts'] = tts
- callback = self.get_callback(post)
- if callback is not None: item['callback'] = callback
- return item
+ def get_items(self):
+ sub_name = self.subreddit_name
+ logger.info(f"Fetching posts from r/{sub_name}")
+ subreddit = self.reddit.subreddit(sub_name)
+ posts = self.subreddit_page(subreddit)
+ items = []
+ for post in posts:
+ if self.filter_post(post):
+ items.append(self.item_from_post(post))
+ return items
- def subreddit_page(self, subreddit):
- return subreddit.hot(limit=25)
+ def item_from_post(self, post):
+ item = {
+ "source": self.source,
+ "id": post.id,
+ "title": self.get_title(post),
+ "link": self.get_link(post),
+ "time": post.created_utc,
+ "author": "/u/" + (post.author.name if post.author else "[deleted]"),
+ "body": self.get_body(post),
+ "tags": self.get_tags(post),
+ "ttl": self.get_ttl(post),
+ }
+ ttl = self.get_ttl(post)
+ if ttl is not None:
+ item["ttl"] = ttl
+ ttd = self.get_ttd(post)
+ if ttd is not None:
+ item["ttd"] = ttd
+ tts = self.get_tts(post)
+ if tts is not None:
+ item["tts"] = tts
+ callback = self.get_callback(post)
+ if callback is not None:
+ item["callback"] = callback
+ return item
- def filter_post(self, post):
- return True
+ def subreddit_page(self, subreddit):
+ return subreddit.hot(limit=25)
- def get_title(self, post):
- s = '[S] ' if post.spoiler else ''
- nsfw = '[NSFW] ' if post.over_18 else ''
- return f'{s}{nsfw}/{post.subreddit_name_prefixed}: {post.title}'
+ def filter_post(self, post):
+ return True
- def get_link(self, post):
- return f'https://reddit.com{post.permalink}'
+ def get_title(self, post):
+ s = "[S] " if post.spoiler else ""
+ nsfw = "[NSFW] " if post.over_18 else ""
+ return f"{s}{nsfw}/{post.subreddit_name_prefixed}: {post.title}"
- def get_body(self, post):
- parts = []
- if not post.is_self:
- parts.append(f'link: {post.url}')
- if hasattr(post, 'preview'):
- try:
- previews = post.preview['images'][0]['resolutions']
- small_previews = [p for p in previews if p['width'] < 800]
- preview = sorted(small_previews, key=lambda p:-p['width'])[0]
- parts.append(f'')
- except:
- pass
- if getattr(post, 'is_gallery', False):
- try:
- for gallery_item in post.gallery_data['items']:
- media_id = gallery_item['media_id']
- metadata = post.media_metadata[media_id]
- small_previews = [p for p in metadata['p'] if p['x'] < 800]
- preview = sorted(small_previews, key=lambda p:-p['x'])[0]
- parts.append(f'link: {metadata["s"]["u"]}')
- parts.append(f'')
- except:
- pass
- if post.selftext:
- limit = post.selftext[1024:].find(' ')
- preview_body = post.selftext[:1024 + limit]
- if len(preview_body) < len(post.selftext):
- preview_body += '[...]'
- parts.append(f'{preview_body}
')
- return '
'.join(parts)
+ def get_link(self, post):
+ return f"https://reddit.com{post.permalink}"
- def get_tags(self, post):
- tags = ['reddit', post.subreddit_name_prefixed[2:]]
- if post.over_18:
- tags.append('nsfw')
- return tags
+ def get_body(self, post):
+ parts = []
+ if not post.is_self:
+ parts.append(f'link: {post.url}')
+ if hasattr(post, "preview"):
+ try:
+ previews = post.preview["images"][0]["resolutions"]
+ small_previews = [p for p in previews if p["width"] < 800]
+ preview = sorted(small_previews, key=lambda p: -p["width"])[0]
+ parts.append(f'')
+ except:
+ pass
+ if getattr(post, "is_gallery", False):
+ try:
+ for gallery_item in post.gallery_data["items"]:
+ media_id = gallery_item["media_id"]
+ metadata = post.media_metadata[media_id]
+ small_previews = [p for p in metadata["p"] if p["x"] < 800]
+ preview = sorted(small_previews, key=lambda p: -p["x"])[0]
+ parts.append(
+ f'link: {metadata["s"]["u"]}'
+ )
+ parts.append(f'')
+ except:
+ pass
+ if post.selftext:
+ limit = post.selftext[1024:].find(" ")
+ preview_body = post.selftext[: 1024 + limit]
+ if len(preview_body) < len(post.selftext):
+ preview_body += "[...]"
+ parts.append(f"{preview_body}
")
+ return "
".join(parts)
- def get_ttl(self, post):
- return 60 * 60 * 24 * 7 # 1 week
+ def get_tags(self, post):
+ tags = ["reddit", post.subreddit_name_prefixed[2:]]
+ if post.over_18:
+ tags.append("nsfw")
+ return tags
- def get_ttd(self, post):
- return None
+ def get_ttl(self, post):
+ return 60 * 60 * 24 * 7 # 1 week
- def get_tts(self, post):
- return None
+ def get_ttd(self, post):
+ return None
- def get_callback(self, post):
- return None
+ def get_tts(self, post):
+ return None
- def callback(self, state, item):
- raise NotImplementedError('callback')
+ def get_callback(self, post):
+ return None
- def on_create(self, state, item):
- raise NotImplementedError('on_create')
+ def callback(self, state, item):
+ raise NotImplementedError("callback")
- def on_delete(self, state, item):
- raise NotImplementedError('on_delete')
+ def on_create(self, state, item):
+ raise NotImplementedError("on_create")
+
+ def on_delete(self, state, item):
+ raise NotImplementedError("on_delete")
diff --git a/inquisitor/timestamp.py b/inquisitor/timestamp.py
index 111bc43..3128432 100644
--- a/inquisitor/timestamp.py
+++ b/inquisitor/timestamp.py
@@ -1,9 +1,11 @@
import time
import datetime
+
def now():
- return int(time.time())
+ return int(time.time())
+
def stamp_to_readable(ts, formatstr="%Y-%m-%d %H:%M:%S"):
- dt = datetime.datetime.fromtimestamp(ts)
- return dt.strftime(formatstr)
+ dt = datetime.datetime.fromtimestamp(ts)
+ return dt.strftime(formatstr)