Convert main to new logging and model modules

This commit is contained in:
Tim Van Baak 2020-04-23 17:47:22 -07:00
parent bfe065583a
commit fdff9e5374
2 changed files with 71 additions and 98 deletions

View File

@ -1,63 +1,29 @@
# Standard library imports
import argparse
import logging
import os
import traceback
import sys
# Module imports
import amanuensis.cli as cli
from amanuensis.cli.helpers import (
USER_ARGS, USER_KWARGS, LEXICON_ARGS, LEXICON_KWARGS)
from amanuensis.cli import describe_commands, get_commands
from amanuensis.config.context import RootConfigDirectoryContext
import amanuensis.config as config
from amanuensis.errors import AmanuensisError
from amanuensis.log import init_logging
from amanuensis.models import ModelFactory
def repl(args):
"""Runs a REPL with the given lexicon"""
# Get all the cli commands' descriptions and add help and exit.
commands = {
name[8:].replace("_", "-"): func.__doc__ for name, func in vars(cli).items()
if name.startswith("command_")}
commands['help'] = "Print this message"
commands['exit'] = "Exit"
print("Amanuensis running on Lexicon {}".format(args.tl_lexicon))
while True:
# Read input in a loop.
try:
data = input("{}> ".format(args.tl_lexicon))
except EOFError:
print()
break
tokens = data.strip().split()
if not data.strip():
pass
elif tokens[0] not in commands:
print("'{}' is not a valid command.".format(tokens[0]))
elif data.strip() == "help":
print("Available commands:")
for name, func in commands.items():
print(" {}: {}".format(name, func))
elif data.strip() == "exit":
print()
break
elif data.strip():
# Execute the command by appending it to the argv the
# REPL was invoked with.
try:
argv = sys.argv[1:] + data.split()
main(argv)
except Exception as e:
traceback.print_exc()
def process_doc(docstring):
return '\n'.join([
line.strip()
for line in (docstring or "").strip().splitlines()
])
def get_parser(valid_commands):
# Set up the top-level parser.
parser = argparse.ArgumentParser(
description=cli.describe_commands(),
description=describe_commands(),
formatter_class=argparse.RawDescriptionHelpFormatter)
# The config directory.
parser.add_argument("--config-dir",
@ -73,21 +39,7 @@ def get_parser(valid_commands):
dest="log_file",
default=os.environ.get(config.ENV_LOG_FILE),
help="Enable verbose file logging")
parser.add_argument("--log-file-size",
dest="log_file_size",
default=os.environ.get(config.ENV_LOG_FILE_SIZE),
help="Maximum rolling log file size")
parser.add_argument("--log-file-num",
dest="log_file_num",
default=os.environ.get(config.ENV_LOG_FILE_NUM),
help="Maximum rolling file count")
# Lexicon settings.
parser.add_argument(*LEXICON_ARGS, **LEXICON_KWARGS)
parser.add_argument(*USER_ARGS, **USER_KWARGS)
parser.set_defaults(
func=lambda args: repl(args)
if args.tl_lexicon
else parser.print_help())
parser.set_defaults(func=lambda args: parser.print_help())
subp = parser.add_subparsers(
metavar="COMMAND",
dest="command",
@ -109,23 +61,29 @@ def get_parser(valid_commands):
return parser
def main(argv):
# Enumerate valid commands from the CLI module.
commands = cli.get_commands()
commands = get_commands()
# Parse args
args = get_parser(commands).parse_args(argv)
# If the command is the init command, a config directory will be
# initialized at args.config_dir. Otherwise, initialize configs using
# that directory.
# First things first, initialize logging
init_logging(args.verbose, args.log_file)
logger = logging.getLogger('amanuensis')
# The init command initializes a config directory at --config-dir.
# All other commands assume that the config dir already exists.
if args.command and args.command != "init":
config.init_config(args)
args.root = RootConfigDirectoryContext(args.config_dir)
args.model_factory = ModelFactory(args.root)
# If verbose logging, dump args namespace
if args.verbose:
config.logger.debug("amanuensis")
logger.debug('amanuensis')
for key, val in vars(args).items():
config.logger.debug(" {}: {}".format(key, val))
logger.debug(f' {key}: {val}')
# Execute command.
try:
@ -134,6 +92,6 @@ def main(argv):
config.logger.error('Unexpected internal {}: {}'.format(
type(e).__name__, str(e)))
if __name__ == "__main__":
import sys
sys.exit(main(sys.argv[1:]))

View File

@ -2,12 +2,19 @@
from argparse import ArgumentParser
from functools import wraps
from json.decoder import JSONDecodeError
from logging import getLogger
from sys import exc_info
logger = getLogger(__name__)
# These function wrappers allow us to use the same function for executing a
# command and for configuring it. This keeps command arg configuration close to
# where the command is defined and allows the main parser to use the same
# function to both set up and execute commands.
#
# The add_argument and no_argument function wrappers allow the same
# function to both configure a command and execute it. This keeps
# command argument configuration close to where the command is defined
# and reduces the number of things the main parser has to handle.
#
def add_argument(*args, **kwargs):
"""Passes the given args and kwargs to subparser.add_argument"""
@ -47,13 +54,21 @@ def no_argument(command):
return augmented_command
# Wrappers for commands requiring lexicon or username options
#
# Many commands require specifying a lexicon or user to operate on, so
# the requires_lexicon and requires_user wrappers replace @add_argument
# as well as automatically create the model for the object from the
# provided identifier.
#
LEXICON_ARGS = ['--lexicon']
LEXICON_KWARGS = {
'metavar': 'LEXICON',
'dest': 'lexicon',
'help': 'Specify a user to operate on'}
def requires_lexicon(command):
@wraps(command)
def augmented_command(cmd_args):
@ -67,34 +82,33 @@ def requires_lexicon(command):
return None
# Verify lexicon argument in execute pass
val = (getattr(cmd_args, 'lexicon')
if hasattr(cmd_args, 'lexicon')
else None)
val = getattr(cmd_args, 'lexicon', None)
if not val:
from amanuensis.config import logger
logger.error("This command requires specifying a lexicon")
logger.error("Missing --lexicon argument")
return -1
from amanuensis.lexicon import LexiconModel
cmd_args.lexicon = LexiconModel.by(name=val) #TODO catch specific exceptions
if cmd_args.lexicon is None:
from amanuensis.config import logger
logger.error('Could not find lexicon "{}"'.format(val))
try:
model_factory = cmd_args.model_factory
cmd_args.lexicon = model_factory.lexicon(val)
except Exception:
ex_type, value, tb = exc_info()
logger.error(
f'Loading lexicon "{val}" failed with '
f'{ex_type.__name__}: {value}')
return -1
return command(cmd_args)
augmented_command.__dict__['wrapper'] = True
return augmented_command
USER_ARGS = ['--user']
USER_KWARGS = {
'metavar': 'USER',
'dest': 'user',
'help': 'Specify a user to operate on'}
def requires_user(command):
"""
Performs all necessary setup and verification for passing a user to a CLI
command.
"""
@wraps(command)
def augmented_command(cmd_args):
# Add user argument in parser pass
@ -107,18 +121,18 @@ def requires_user(command):
return None
# Verify user argument in execute pass
val = (getattr(cmd_args, "user")
if hasattr(cmd_args, "user")
else None)
val = getattr(cmd_args, "user", None)
if not val:
from amanuensis.config import logger
logger.error("This command requires specifying a user")
logger.error("Missing --user argument")
return -1
from amanuensis.user import UserModel
cmd_args.user = UserModel.by(name=val) #TODO catch specific exceptions
if cmd_args.user is None:
from amanuensis.config import logger
logger.error('Could not find user "{}"'.format(val))
try:
model_factory = cmd_args.model_factory
cmd_args.user = model_factory.user(val)
except Exception:
ex_type, value, tb = exc_info()
logger.error(
f'Loading user "{val}" failed with '
f'{ex_type.__name__}: {value}')
return -1
return command(cmd_args)
@ -140,15 +154,16 @@ def alias(cmd_alias):
# Helpers for common command tasks
CONFIG_GET_ROOT_VALUE = object()
def config_get(cfg, pathspec):
"""
Performs config --get for a given config
cfg is from a with json_ro context
cfg is from a `with json_ro` context
path is the full pathspec, unsplit
"""
import json
from amanuensis.config import logger
if pathspec is CONFIG_GET_ROOT_VALUE:
path = []
@ -162,6 +177,7 @@ def config_get(cfg, pathspec):
print(json.dumps(cfg, indent=2))
return 0
def config_set(obj_id, cfg, set_tuple):
"""
Performs config --set for a given config
@ -170,7 +186,6 @@ def config_set(obj_id, cfg, set_tuple):
set_tuple is a tuple of the pathspec and the value
"""
import json
from amanuensis.config import logger
pathspec, value = set_tuple
if not pathspec:
logger.error("Path must be non-empty")
@ -178,7 +193,7 @@ def config_set(obj_id, cfg, set_tuple):
try:
value = json.loads(value)
except JSONDecodeError:
pass # Leave value as string
pass # Leave value as string
for spec in path[:-1]:
if spec not in cfg:
logger.error("Path not found")