Refactor CLI to interact with new dungeon code

This commit is contained in:
Tim Van Baak 2019-06-13 23:46:22 -07:00
parent 6b2f898d36
commit f284faee92
3 changed files with 82 additions and 86 deletions

View File

@ -4,93 +4,81 @@ import logging
import os
# Application imports
from core import load_all_sources
from dungeon import Dungeon
import dungeon as dungeonlib
# Globals
logger = logging.getLogger("inquisitor.cli")
def run():
def parse_args(commands):
parser = argparse.ArgumentParser()
parser.add_argument("command", default="help", help="The command to execute", choices=list(commands.keys()))
parser.add_argument("--srcdir", help="Path to sources folder (default ./sources)", default="./sources")
parser.add_argument("--dungeon", help="Path to item cache folder (default ./dungeon)", default="./dungeon")
parser.add_argument("--sources", help="Sources to update, by name", nargs="*")
parser.add_argument("--log", default="INFO", help="Set the log level (default: INFO)")
subparsers = parser.add_subparsers(help="Command to execute", dest="command")
subparsers.required = True
update_parser = subparsers.add_parser("update", help="Fetch new items")
update_parser.add_argument("--srcdir", help="Path to sources folder (default ./sources)",
default="./sources")
update_parser.add_argument("--dungeon", help="Path to item cache folder (default ./dungeon)",
default="./dungeon")
update_parser.add_argument("--sources", help="Sources to update, by name",
nargs="*")
update_parser.set_defaults(func=update)
deactivate_parser = subparsers.add_parser("deactivate", help="Deactivate items by source")
deactivate_parser.add_argument("--srcdir", help="Path to sources folder (default ./sources)",
default="./sources")
deactivate_parser.add_argument("--dungeon", help="Path to item cache folder (default ./dungeon)",
default="./dungeon")
deactivate_parser.add_argument("--sources", help="Sources to deactivate, by name",
nargs="*")
deactivate_parser.set_defaults(func=deactivate)
args = parser.parse_args()
# Configure logging
if not os.path.isdir(args.srcdir):
print("Error: srcdir must be a directory")
exit(-1)
if not os.path.isdir(args.dungeon):
logger.error("Error: dungeon must be a directory")
exit(-1)
if not args.sources:
logger.error("Error: No sources specified")
exit(-1)
return args
def run():
# Enumerate valid commands.
g = globals()
commands = {
name[8:] : g[name]
for name in g
if name.startswith("command_")}
args = parse_args(commands)
# Configure logging.
loglevel = getattr(logging, args.log.upper())
if not isinstance(loglevel, int):
raise ValueError("Invalid log level: {}".format(args.log))
logging.basicConfig(format='[%(levelname)s:%(filename)s:%(lineno)d] %(message)s', level=loglevel)
args.func(args)
# Execute command.
commands[args.command](args)
def update(args):
def command_update(args):
"""Fetches new items from sources and stores them in the dungeon."""
if not os.path.isdir(args.srcdir):
logger.error("srcdir must be a directory")
exit(-1)
if not os.path.isdir(args.dungeon):
logger.error("dungeon must be a directory")
exit(-1)
sources = load_all_sources(args.srcdir)
source_names = [s.SOURCE for s in sources]
logger.debug("Known sources: {}".format(source_names))
if args.sources:
names = args.sources
for name in names:
if name not in source_names:
logger.error("Source not found: {}".format(name))
else:
names = source_names
dungeon = Dungeon(args.dungeon)
for itemsource in sources:
if itemsource.SOURCE in names:
new_items = dungeon.update(itemsource)
items = dungeon.get_active_items_for_folder(itemsource.SOURCE)
logger.info("{} new item{}".format(new_items, "s" if new_items != 1 else ""))
# Initialize dungeon.
dungeon = dungeonlib.Dungeon(args.dungeon)
def deactivate(args):
# Process each source argument.
for source_arg in args.sources:
dungeon.update(source_arg, args)
def command_deactivate(args):
"""Deactivates all items in the given sources."""
if not os.path.isdir(args.dungeon):
logger.error("dungeon must be a directory")
exit(-1)
sources = load_all_sources(args.srcdir)
source_names = [s.SOURCE for s in sources]
logger.debug("Known sources: {}".format(source_names))
if args.sources:
names = args.sources
for name in names:
if name not in source_names:
logger.error("Source not found: {}".format(name))
else:
names = source_names
dungeon = Dungeon(args.dungeon)
for name in names:
if os.path.isdir(os.path.join(args.dungeon, name)):
items = dungeon.get_active_items_for_folder(name)
for item in items:
dungeon.deactivate_item(name, item['id'])
else:
logger.error("Folder not found: {}".format(name))
# Initialize dungeon.
dungeon = dungeonlib.Dungeon(args.dungeon)
# Deactivate all items in each source.
for source_name in args.sources:
if source_name not in dungeon:
print("Error: No source named '{}'".format(source_name))
print("Valid source names are: " + " ".join([s for s in dungeon]))
continue
cell = dungeon[source_name]
count = 0
for item_id in cell:
item = cell[item_id]
if item['active']:
item.deactivate()
count += 1
logger.info("Deactivated {} items in '{}'".format(count, source_name))

View File

@ -7,10 +7,8 @@ import time
import random
import traceback
# Application imports
from item import create_item
import item # This will define create_item as a builtin
# Globals
logger = logging.getLogger("inquisitor.dungeon")
@ -85,7 +83,7 @@ class DungeonCell():
with open(state_path, 'r', encoding='utf-8') as f:
self.state = ast.literal_eval(f.read())
def _item_path(key):
def _item_path(self, key):
return os.path.join(self.path, key + ".item")
def __getitem__(self, key):
@ -95,7 +93,7 @@ class DungeonCell():
return ReadableItem(self.dungeon_path, self.name, key)
def __setitem__(self, key, value):
logger.info("Setting item {} in cell {}".format(key, self.name))
logger.debug("Setting item {} in cell {}".format(key, self.name))
if type(value) is ReadableItem:
value = value.item
if type(value) is not dict:
@ -105,7 +103,7 @@ class DungeonCell():
f.write(str(value))
def __delitem__(self, key):
logger.info("Deleting item '{}' in cell '{}'".format(key, self.name))
logger.debug("Deleting item '{}' in cell '{}'".format(key, self.name))
filepath = self._item_path(key)
if os.path.isfile(filepath):
os.remove(filepath)
@ -131,7 +129,7 @@ class DungeonCell():
# Get the ids of the existing items.
prior_item_ids = [item_id for item_id in self]
# Get the new items.
new_items = itemsource.fetch_new(state, args)
new_items = source.fetch_new(self.state, args)
self.save_state()
new_count = del_count = 0
for item in new_items:
@ -171,7 +169,7 @@ class Dungeon():
for filename in os.listdir(self.path):
if not os.path.isdir(os.path.join(self.path, filename)):
continue
if not os.path.isfile(os.path.join(self.path, filename, 'status')):
if not os.path.isfile(os.path.join(self.path, filename, 'state')):
continue
self.cells[filename] = DungeonCell(self.path, filename)
# Ensure Inquisitor's source is present
@ -194,7 +192,6 @@ class Dungeon():
yield name
def push_error_item(self, title, body=None):
logger.error(title)
item = create_item(
'inquisitor',
'{:x}'.format(random.getrandbits(16 * 4)),
@ -208,25 +205,33 @@ class Dungeon():
necessary. Returns the source and its DungeonCell if the source is
valid and None, None otherwise.
"""
# Check if the named source is present in the sources directory.
source_file_path = os.path.join(sources_path, source_name + ".py")
if not os.path.isfile(source_file_path):
# Move to the sources directory.
cwd = os.getcwd()
os.chdir(sources_path)
# Check if the named source is present.
source_file_name = source_name + ".py"
if not os.path.isfile(source_file_name):
os.chdir(cwd)
msg = "Could not find source '{}'".format(source_name)
logger.error(msg)
self.push_error_item(msg)
return None, None
# Try to import the source module.
try:
logger.debug("Loading module {}".format(source_file_path))
spec = importlib.util.spec_from_file_location("itemsource", source_file_path)
logger.debug("Loading module {}".format(source_file_name))
spec = importlib.util.spec_from_file_location("itemsource", source_file_name)
itemsource = importlib.util.module_from_spec(spec)
spec.loader.exec_module(itemsource)
if not hasattr(itemsource, 'fetch_new'):
raise ImportError("fetch_new missing")
except Exception:
os.chdir(cwd)
msg = "Error importing source '{}'".format(source_name)
logger.error("{}\n{}".format(msg, traceback.format_exc()))
self.push_error_item(msg, traceback.format_exc())
return None, None
# Since the source is valid, get or create the source cell.
os.chdir(cwd)
if source_name not in self:
self[source_name] = DungeonCell(self.path, source_name)
@ -255,5 +260,6 @@ class Dungeon():
deleted_count, "s" if deleted_count != 1 else ""))
except:
msg = "Error fetching items from source '{}'".format(source_name)
logger.error("{}\n{}".format(msg, traceback.format_exc()))
self.push_error_item(msg, traceback.format_exc())
return

View File

@ -2,14 +2,13 @@
import importlib.util
import os
import logging
import time
# Globals
logger = logging.getLogger("inquisitor.item")
def create_item(source, item_id, title, link=None, time=None, author=None, body=None):
import time
item = {
'id': item_id,
'source': source,
@ -26,3 +25,6 @@ def create_item(source, item_id, title, link=None, time=None, author=None, body=
if body is not None:
item['body'] = body
return item
import builtins
builtins.create_item = create_item