Revamp CLI and add add command

This commit is contained in:
Tim Van Baak 2019-12-16 23:40:46 -08:00
parent ecd9d67881
commit 69627d3fda
4 changed files with 153 additions and 73 deletions

View File

@ -1,42 +1,43 @@
# Standard library imports # Standard library imports
import argparse import argparse
import os
import logging
import sys
# Application imports # Application imports
import cli import cli
import configs
# Globals
logger = logging.getLogger("inquisitor.__main__")
def parse_args(valid_commands): def parse_args(valid_commands):
command_descs = "\n".join([ command_descs = "\n".join([
"- {0}: {1}".format(name, func.__doc__) "- {0}: {1}".format(name, func.__doc__)
for name, func in valid_commands.items()]) for name, func in valid_commands.items()])
parser = argparse.ArgumentParser(description="Available commands:\n{}\n".format(command_descs), formatter_class=argparse.RawDescriptionHelpFormatter) parser = argparse.ArgumentParser(
parser.add_argument("command", default="help", help="The command to execute", choices=valid_commands, metavar="COMMAND") description="Available commands:\n{}\n".format(command_descs),
parser.add_argument("--srcdir", help="Path to sources folder (default ./sources)", default="./sources") formatter_class=argparse.RawDescriptionHelpFormatter,
parser.add_argument("--dungeon", help="Path to item cache folder (default ./dungeon)", default="./dungeon") add_help=False)
parser.add_argument("--sources", help="Sources to update, by name", nargs="*") parser.add_argument("command",
parser.add_argument("--log", default="INFO", help="Set the log level (default: INFO)") nargs="?",
default="help",
help="The command to execute",
choices=valid_commands,
metavar="command")
parser.add_argument("args",
nargs=argparse.REMAINDER,
help="Command arguments",
metavar="args")
parser.add_argument("-v",
action="store_true",
dest="verbose",
help="Enable debug logging")
args = parser.parse_args() global print_usage
print_usage = parser.print_help
return args return parser.parse_args()
def run_flask_server(args):
"""Run the default flask server serving from the specified dungeon."""
try:
from app import app
app.run()
return 0
except Exception as e:
logger.error(e)
return (-1)
def command_help(args):
"""Print this help message and exit."""
print_usage()
return 0
def main(): def main():
# Enumerate valid commands. # Enumerate valid commands.
@ -44,21 +45,19 @@ def main():
name[8:] : func name[8:] : func
for name, func in vars(cli).items() for name, func in vars(cli).items()
if name.startswith("command_")} if name.startswith("command_")}
commands["run"] = run_flask_server commands['help'] = command_help
args = parse_args(commands) args = parse_args(commands)
# Configure logging. # Configure logging.
if args.command != 'run': if args.verbose:
loglevel = getattr(logging, args.log.upper()) configs.log_verbose()
if not isinstance(loglevel, int):
raise ValueError("Invalid log level: {}".format(args.log))
logging.basicConfig(format='[%(levelname)s:%(filename)s:%(lineno)d] %(message)s', level=loglevel)
# Execute command. # Execute command.
if args.command: if args.command:
return commands[args.command](args) return commands[args.command](args.args)
if __name__ == "__main__": if __name__ == "__main__":
import sys
sys.exit(main()) sys.exit(main())

View File

@ -1,60 +1,131 @@
# Standard library imports # Standard library imports
import argparse
import json
import logging import logging
import os import os
import random
# Application imports # Application imports
import dungeon as dungeonlib from configs import logger, DUNGEON_PATH, SOURCES_PATH
# Globals
logger = logging.getLogger("inquisitor.cli")
def command_update(args): def command_update(args):
"""Fetch and store new items from the specified sources.""" """Fetch and store new items from the specified sources."""
if not os.path.isdir(args.srcdir): parser = argparse.ArgumentParser(
print("update: Error: srcdir must be a directory") prog="inquisitor update",
return (-1) description=command_update.__doc__,
if not os.path.isdir(args.dungeon): add_help=False)
logger.error("update: Error: dungeon must be a directory") parser.add_argument("source",
return (-1) nargs="*",
if not args.sources: help="Sources to update.")
logger.error("update: Error: No sources specified") args = parser.parse_args(args)
return (-1)
# Initialize dungeon. if len(args.source) == 0:
dungeon = dungeonlib.Dungeon(args.dungeon) parser.print_help()
return 0
# Process each source argument. if not os.path.isdir(DUNGEON_PATH):
for source_arg in args.sources: logger.error("Couldn't find dungeon. Set INQUISITOR_DUNGEON or cd to parent folder of ./dungeon")
dungeon.update(source_arg, args) return -1
if not os.path.isdir(SOURCES_PATH):
logger.error("Couldn't find sources. Set INQUISITOR_SOURCES or cd to parent folder of ./sources")
# Update sources
from importer import update_sources
update_sources(*args.source)
return 0 return 0
def command_deactivate(args): def command_deactivate(args):
"""Deactivate all items in the specified dungeon cells.""" """Deactivate all items in the specified dungeon cells."""
if not os.path.isdir(args.dungeon): parser = argparse.ArgumentParser(
logger.error("deactivate: Error: dungeon must be a directory") prog="inquisitor deactivate",
return (-1) description=command_deactivate.__doc__,
if not args.sources: add_help=False)
logger.error("deactivate: Error: No sources specified") parser.add_argument("source",
return (-1) nargs="*",
help="Cells to deactivate.")
args = parser.parse_args(args)
# Initialize dungeon. if len(args.source) == 0:
dungeon = dungeonlib.Dungeon(args.dungeon) parser.print_help()
return 0
if not os.path.isdir(DUNGEON_PATH):
logger.error("Couldn't find dungeon. Set INQUISITOR_DUNGEON or cd to parent folder of ./dungeon")
return -1
# Deactivate all items in each source. # Deactivate all items in each source.
for source_name in args.sources: from loader import load_items
if source_name not in dungeon: for source_name in args.source:
print("Error: No source named '{}'".format(source_name)) path = os.path.join(DUNGEON_PATH, source_name)
print("Valid source names are: " + " ".join([s for s in dungeon])) if not os.path.isdir(path):
continue logger.warning("'{}' is not an extant source".format(source_name))
cell = dungeon[source_name]
count = 0 count = 0
for item_id in cell: items, _ = load_items(source_name)
item = cell[item_id] for item in items.values():
if item['active']: if item['active']:
item.deactivate() item['active'] = False
count += 1 count += 1
logger.info("Deactivated {} items in '{}'".format(count, source_name)) logger.info("Deactivated {} items in '{}'".format(count, source_name))
return 0 return 0
def command_add(args):
"""Creates an item."""
parser = argparse.ArgumentParser(
prog="inquisitor add",
description=command_add.__doc__,
add_help=False)
parser.add_argument("--id", help="String")
parser.add_argument("--source", help="String")
parser.add_argument("--title", help="String")
parser.add_argument("--link", help="URL")
parser.add_argument("--time", type=int, help="Unix timestmap")
parser.add_argument("--author", help="String")
parser.add_argument("--body", help="HTML")
parser.add_argument("--tags", help="Comma-separated list")
parser.add_argument("--ttl", type=int, help="Cleanup protection in seconds")
args = parser.parse_args(args)
if not args.title:
parser.print_help()
return 0
if not os.path.isdir(DUNGEON_PATH):
logger.error("Couldn't find dungeon. Set INQUISITOR_DUNGEON or cd to parent folder of ./dungeon")
return -1
from importer import populate_new
item = {
'id': '{:x}'.format(random.getrandbits(16 * 4)),
'source': 'inquisitor'
}
if args.id: item['id'] = str(args.id)
if args.source: item['source'] = str(args.source)
if args.title: item['title'] = str(args.title)
if args.link: item['link'] = str(args.link)
if args.time: item['time'] = int(args.time)
if args.author: item['author'] = str(args.author)
if args.body: item['body'] = str(args.body)
if args.tags: item['tags'] = [str(tag) for tag in args.tags]
if args.ttl: item['ttl'] = int(args.ttl)
populate_new(item)
s = json.dumps(item, indent=2)
path = os.path.join(DUNGEON_PATH, item['source'], item['id'] + '.item')
with open(path, 'w', encoding='utf8') as f:
f.write(s)
logger.info(item)
# def command_run(args):
# """Run the default Flask server."""
# pass
# def run_flask_server(args):
# """Run the default flask server serving from the specified dungeon."""
# try:
# from app import app
# app.run()
# return 0
# except Exception as e:
# logger.error(e)
# return (-1)

View File

@ -5,9 +5,19 @@ DUNGEON_PATH = os.path.abspath(os.environ.get("INQUISITOR_DUNGEON") or "./dungeo
SOURCES_PATH = os.path.abspath(os.environ.get("INQUISITOR_SOURCES") or "./sources") SOURCES_PATH = os.path.abspath(os.environ.get("INQUISITOR_SOURCES") or "./sources")
logger = logging.getLogger("inquisitor") logger = logging.getLogger("inquisitor")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler() handler = logging.StreamHandler()
handler.setLevel(logging.INFO)
formatter = logging.Formatter('[{levelname}] {message}', style="{")
handler.setFormatter(formatter)
logger.addHandler(handler) logger.addHandler(handler)
def log_normal():
logger.setLevel(logging.INFO)
handler.setLevel(logging.INFO)
formatter = logging.Formatter('[{levelname}] {message}', style="{")
handler.setFormatter(formatter)
def log_verbose():
logger.setLevel(logging.DEBUG)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('[{asctime}] [{levelname}:{filename}:{lineno}] {message}', style="{")
handler.setFormatter(formatter)
log_normal()

View File

@ -74,7 +74,7 @@ def update_source(source_name, fetch_new):
# If the item is new, write it. # If the item is new, write it.
new_count += 1 new_count += 1
s = json.dumps(item) s = json.dumps(item)
path = os.path.join(DUNGEON_PATH, item['source'], item['id']) path = os.path.join(DUNGEON_PATH, item['source'], item['id'] + ".item")
with open(path, 'w', encoding="utf8") as f: with open(path, 'w', encoding="utf8") as f:
f.write(s) f.write(s)