Compare commits

..

5 Commits

7 changed files with 156 additions and 434 deletions

View File

@ -1,72 +1,94 @@
# from argparse import ArgumentParser
# The cli module must not import other parts of the application at the module import logging
# level. This is because most other modules depend on the config module. The import logging.config
# config module may depend on __main__'s commandline parsing to locate config
# files, and __main__'s commandline parsing requires importing (but not import amanuensis.cli.admin
# executing) the functions in the cli module. Thus, cli functions must only
# import the config module inside the various command methods, which are only
# run after commandline parsing has already occurred.
#
def server_commands(commands={}): LOGGING_CONFIG = {
if commands: "version": 1,
return commands "formatters": {
import amanuensis.cli.server "fmt_basic": {
for name, func in vars(amanuensis.cli.server).items(): "validate": True,
"format": "%(message)s",
},
"fmt_detailed": {
"validate": True,
"format": "%(asctime)s %(levelname)s %(message)s"
},
},
"handlers": {
"hnd_stderr": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "fmt_basic",
},
},
"loggers": {
__name__: {
"level": "DEBUG",
"handlers": ["hnd_stderr"]
}
},
}
def add_subcommand(subparsers, module) -> None:
"""Add a cli submodule's commands as a subparser."""
# Get the command information from the module
command_name: str = getattr(module, "COMMAND_NAME")
command_help: str = getattr(module, "COMMAND_HELP")
if not command_name and command_help:
return
# Add the subparser for the command and set a default action
command_parser: ArgumentParser = subparsers.add_parser(
command_name, help=command_help
)
command_parser.set_defaults(func=lambda args: command_parser.print_usage())
# Add all subcommands in the command module
subcommands = command_parser.add_subparsers(metavar="SUBCOMMAND")
for name, obj in vars(module).items():
if name.startswith("command_"): if name.startswith("command_"):
name = name[8:].replace("_", "-") # Hyphenate subcommand names
commands[name] = func sc_name: str = name[8:].replace("_", "-")
return commands # Only the first line of the subcommand function docstring is used
sc_help = ((obj.__doc__ or "").strip() or "\n").splitlines()[0]
# Add the command and any arguments defined by its decorators
subcommand: ArgumentParser = subcommands.add_parser(
sc_name, help=sc_help, description=obj.__doc__
)
subcommand.set_defaults(func=obj)
for args, kwargs in obj.__dict__.get("add_argument", []):
subcommand.add_argument(*args, **kwargs)
def lexicon_commands(commands={}): def init_logger(args):
if commands: """Set up logging based on verbosity args"""
return commands if (args.verbose):
import amanuensis.cli.lexicon handler = LOGGING_CONFIG["handlers"]["hnd_stderr"]
for name, func in vars(amanuensis.cli.lexicon).items(): handler["formatter"] = "fmt_detailed"
if name.startswith("command_"): handler["level"] = "DEBUG"
name = name[8:].replace("_", "-") logging.config.dictConfig(LOGGING_CONFIG)
commands["lexicon-" + name] = func
return commands
def user_commands(commands={}): def main():
if commands: """CLI entry point"""
return commands # Set up the top-level parser
import amanuensis.cli.user parser = ArgumentParser()
for name, func in vars(amanuensis.cli.user).items(): parser.set_defaults(
if name.startswith("command_"): parser=parser,
name = name[8:].replace("_", "-") func=lambda args: parser.print_usage(),
commands["user-" + name] = func )
return commands parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
# Add commands from cli submodules
subparsers = parser.add_subparsers(metavar="COMMAND")
add_subcommand(subparsers, amanuensis.cli.admin)
def get_commands(): # Parse args and execute the desired action
return {**server_commands(), **lexicon_commands(), **user_commands()} args = parser.parse_args()
init_logger(args)
args.func(args)
def cmd_desc(func):
return ((func.__doc__ or "").strip() or '\n').splitlines()[0]
def describe_commands():
longest = max(map(len, server_commands().keys()))
server_desc = "General commands:\n{}\n".format("\n".join([
" {1:<{0}} : {2}".format(longest, name, cmd_desc(func))
for name, func in server_commands().items()
]))
longest = max(map(len, lexicon_commands().keys()))
lexicon_desc = "Lexicon commands:\n{}\n".format("\n".join([
" {1:<{0}} : {2}".format(longest, name, cmd_desc(func))
for name, func in lexicon_commands().items()
]))
longest = max(map(len, user_commands().keys()))
user_desc = "User commands:\n{}\n".format("\n".join([
" {1:<{0}} : {2}".format(longest, name, cmd_desc(func))
for name, func in user_commands().items()
]))
return "\n".join([server_desc, lexicon_desc, user_desc])

58
amanuensis/cli/admin.py Normal file
View File

@ -0,0 +1,58 @@
import collections
import json
import logging
import os
from amanuensis.db import DbContext
from .helpers import add_argument
COMMAND_NAME = "admin"
COMMAND_HELP = "Interact with Amanuensis."
LOG = logging.getLogger(__name__)
@add_argument("path", metavar="DB_PATH", help="Path to where the database should be created")
@add_argument("--force", "-f", action="store_true", help="Overwrite existing database")
@add_argument("--verbose", "-v", action="store_true", help="Enable db echo")
def command_init_db(args) -> int:
"""
Initialize the Amanuensis database.
"""
# Check if force is required
if not args.force and os.path.exists(args.path):
args.parser.error(f"{args.path} already exists and --force was not specified")
# Initialize the database
db_uri = f"sqlite:///{os.path.abspath(args.path)}"
LOG.info(f"Creating database at {db_uri}")
db = DbContext(db_uri, debug=args.verbose)
db.create_all()
LOG.info("Done")
return 0
@add_argument("path", metavar="CONFIG_PATH", help="Path to the config file")
def command_secret_key(args) -> int:
"""
Generate a Flask secret key.
The Flask server will not run unless a secret key has
been generated.
"""
# Load the json config
with open(args.path, mode="r", encoding="utf8") as f:
config = json.load(f, object_pairs_hook=collections.OrderedDict)
# Set the secret key to a new random string
config["SECRET_KEY"] = os.urandom(32).hex()
# Write the config back out
with open(args.path, mode="w", encoding="utf8") as f:
json.dump(config, f, indent=2)
LOG.info("Regenerated Flask secret key")
return 0

View File

@ -1,209 +1,20 @@
# Standard library imports """
from argparse import ArgumentParser Helpers for cli commands.
from functools import wraps """
from json.decoder import JSONDecodeError
from logging import getLogger
from sys import exc_info
logger = getLogger(__name__)
#
# The add_argument and no_argument function wrappers allow the same
# function to both configure a command and execute it. This keeps
# command argument configuration close to where the command is defined
# and reduces the number of things the main parser has to handle.
#
def add_argument(*args, **kwargs): def add_argument(*args, **kwargs):
"""Passes the given args and kwargs to subparser.add_argument""" """Defines an argument to a cli command."""
def argument_adder(command): def argument_adder(command_func):
@wraps(command) """Decorator function for storing parser args on the function."""
def augmented_command(cmd_args):
# Add this wrapper's command in the parser pass
if isinstance(cmd_args, ArgumentParser):
cmd_args.add_argument(*args, **kwargs)
# If there are more command wrappers, pass through to them
if command.__dict__.get('wrapper', False):
command(cmd_args)
# Parser pass doesn't return a value
return None
# Pass through transparently in the execute pass # Store the kw/args in the function dictionary
return command(cmd_args) add_args = command_func.__dict__.get("add_argument", [])
add_args.append((args, kwargs))
command_func.__dict__["add_argument"] = add_args
# Mark the command as wrapped so control passes through # Return the same function
augmented_command.__dict__['wrapper'] = True return command_func
return augmented_command
return argument_adder return argument_adder
def no_argument(command):
"""Noops for subparsers"""
@wraps(command)
def augmented_command(cmd_args):
# Noop in the parser pass
if isinstance(cmd_args, ArgumentParser):
return None
# Pass through in the execute pass
return command(cmd_args)
return augmented_command
#
# Many commands require specifying a lexicon or user to operate on, so
# the requires_lexicon and requires_user wrappers replace @add_argument
# as well as automatically create the model for the object from the
# provided identifier.
#
LEXICON_ARGS = ['--lexicon']
LEXICON_KWARGS = {
'metavar': 'LEXICON',
'dest': 'lexicon',
'help': 'Specify a user to operate on'}
def requires_lexicon(command):
@wraps(command)
def augmented_command(cmd_args):
# Add lexicon argument in parser pass
if isinstance(cmd_args, ArgumentParser):
cmd_args.add_argument(*LEXICON_ARGS, **LEXICON_KWARGS)
# If there are more command wrappers, pass through to them
if command.__dict__.get('wrapper', False):
command(cmd_args)
# Parser pass doesn't return a value
return None
# Verify lexicon argument in execute pass
val = getattr(cmd_args, 'lexicon', None)
if not val:
logger.error("Missing --lexicon argument")
return -1
try:
model_factory = cmd_args.model_factory
cmd_args.lexicon = model_factory.lexicon(val)
except Exception:
ex_type, value, tb = exc_info()
logger.error(
f'Loading lexicon "{val}" failed with '
f'{ex_type.__name__}: {value}')
return -1
return command(cmd_args)
augmented_command.__dict__['wrapper'] = True
return augmented_command
USER_ARGS = ['--user']
USER_KWARGS = {
'metavar': 'USER',
'dest': 'user',
'help': 'Specify a user to operate on'}
def requires_user(command):
@wraps(command)
def augmented_command(cmd_args):
# Add user argument in parser pass
if isinstance(cmd_args, ArgumentParser):
cmd_args.add_argument(*USER_ARGS, **USER_KWARGS)
# If there are more command wrappers, pass through to them
if command.__dict__.get('wrapper', False):
command(cmd_args)
# Parser pass doesn't return a value
return None
# Verify user argument in execute pass
val = getattr(cmd_args, "user", None)
if not val:
logger.error("Missing --user argument")
return -1
try:
model_factory = cmd_args.model_factory
cmd_args.user = model_factory.user(val)
except Exception:
ex_type, value, tb = exc_info()
logger.error(
f'Loading user "{val}" failed with '
f'{ex_type.__name__}: {value}')
return -1
return command(cmd_args)
augmented_command.__dict__['wrapper'] = True
return augmented_command
# Wrapper for aliasing commands
def alias(cmd_alias):
"""Adds an alias to the function dictionary"""
def aliaser(command):
aliases = command.__dict__.get('aliases', [])
aliases.append(cmd_alias)
command.__dict__['aliases'] = aliases
return command
return aliaser
# Helpers for common command tasks
CONFIG_GET_ROOT_VALUE = object()
def config_get(cfg, pathspec):
"""
Performs config --get for a given config
cfg is from a `with json_ro` context
path is the full pathspec, unsplit
"""
import json
if pathspec is CONFIG_GET_ROOT_VALUE:
path = []
else:
path = pathspec.split(".")
for spec in path:
if spec not in cfg:
logger.error("Path not found: {}".format(pathspec))
return -1
cfg = cfg.get(spec)
print(json.dumps(cfg, indent=2))
return 0
def config_set(obj_id, cfg, set_tuple):
"""
Performs config --set for a given config
config is from a "with json_rw" context
set_tuple is a tuple of the pathspec and the value
"""
import json
pathspec, value = set_tuple
if not pathspec:
logger.error("Path must be non-empty")
path = pathspec.split('.')
try:
value = json.loads(value)
except JSONDecodeError:
pass # Leave value as string
for spec in path[:-1]:
if spec not in cfg:
logger.error("Path not found")
return -1
cfg = cfg.get(spec)
key = path[-1]
if key not in cfg:
logger.error("Path not found")
return -1
old_value = cfg[key]
cfg[key] = value
logger.info("{}.{}: {} -> {}".format(obj_id, pathspec, old_value, value))
return 0

View File

@ -1,120 +0,0 @@
import logging
import os
from amanuensis.config import RootConfigDirectoryContext
from .helpers import (
add_argument,
no_argument,
alias,
config_get,
config_set,
CONFIG_GET_ROOT_VALUE)
logger = logging.getLogger(__name__)
@alias('i')
@add_argument("--refresh",
action="store_true",
help="Refresh an existing config directory")
def command_init(args):
"""
Initialize a config directory at --config-dir
A clean config directory will contain a config.json, a
lexicon config directory, and a user config directory.
Refreshing an existing directory will add keys to the global config that
are present in the default configs. Users and lexicons that are missing
from the indexes will be deleted, and stale index entries will be removed.
"""
# Module imports
from amanuensis.config.init import create_config_dir
# Verify arguments
if args.refresh and not os.path.isdir(args.config_dir):
print("Error: couldn't find directory '{}'".format(args.config_dir))
# Internal call
create_config_dir(args.config_dir, args.refresh)
logger.info(f'Initialized config dir at {args.config_dir}')
return 0
@alias('gs')
@no_argument
def command_generate_secret(args):
"""
Generate a Flask secret key
The Flask server will not run unless a secret key has
been generated.
"""
root: RootConfigDirectoryContext = args.root
secret_key: bytes = os.urandom(32)
with root.edit_config() as cfg:
cfg.secret_key = secret_key.hex()
logger.info("Regenerated Flask secret key")
return 0
@alias('r')
@add_argument("-a", "--address", default="127.0.0.1")
@add_argument("-p", "--port", default="5000")
@add_argument("--debug", action="store_true")
def command_run(args):
"""
Run the default Flask server
The default Flask server is not secure, and should
only be used for development.
"""
from amanuensis.server import get_app
root: RootConfigDirectoryContext = args.root
with root.read_config() as cfg:
if cfg.secret_key is None:
logger.error("Can't run server without a secret_key. "
"Run generate-secet first.")
return -1
get_app(root).run(host=args.address, port=args.port, debug=args.debug)
return 0
@alias('n')
@add_argument("--get",
metavar="PATHSPEC",
dest="get",
nargs="?",
const=CONFIG_GET_ROOT_VALUE,
help="Get the value of a config key")
@add_argument("--set",
metavar=("PATHSPEC", "VALUE"),
dest="set",
nargs=2,
help="Set the value of a config key")
def command_config(args):
"""
Interact with the global config
PATHSPEC is a path into the config object formatted as
a dot-separated sequence of keys.
"""
root: RootConfigDirectoryContext = args.root
if args.get and args.set:
logger.error("Specify one of --get and --set")
return -1
if args.get:
with root.read_config() as cfg:
config_get(cfg, args.get)
if args.set:
with root.edit_config() as cfg:
config_set("config", cfg, args.set)
return 0

View File

@ -1,5 +0,0 @@
from .setup import init_logging
__all__ = [member.__name__ for member in [
init_logging
]]

View File

@ -1,45 +0,0 @@
import logging
import logging.handlers
basic_formatter = logging.Formatter(
fmt='[{levelname}] {message}',
style='{')
detailed_formatter = logging.Formatter(
fmt='[{asctime}] [{levelname}:{filename}:{lineno}] {message}',
style='{')
basic_console_handler = logging.StreamHandler()
basic_console_handler.setLevel(logging.INFO)
basic_console_handler.setFormatter(basic_formatter)
detailed_console_handler = logging.StreamHandler()
detailed_console_handler.setLevel(logging.DEBUG)
detailed_console_handler.setFormatter(detailed_formatter)
def get_file_handler(filename: str) -> logging.Handler:
handler = logging.handlers.RotatingFileHandler(
filename=filename,
maxBytes=1000000,
backupCount=10,
delay=True,
encoding='utf8',
)
handler.setLevel(logging.DEBUG)
handler.setFormatter(detailed_formatter)
return handler
def init_logging(verbose: bool, log_filename: str):
"""
Initializes the Amanuensis logger settings
"""
logger = logging.getLogger("amanuensis")
if log_filename:
logger.addHandler(get_file_handler(log_filename))
logger.setLevel(logging.DEBUG)
elif verbose:
logger.addHandler(detailed_console_handler)
logger.setLevel(logging.DEBUG)
else:
logger.addHandler(basic_console_handler)
logger.setLevel(logging.INFO)

View File

@ -17,6 +17,7 @@ black = "^21.5b2"
mypy = "^0.812" mypy = "^0.812"
[tool.poetry.scripts] [tool.poetry.scripts]
amanuensis-cli = "amanuensis.cli:main"
amanuensis-server = "amanuensis.server:run" amanuensis-server = "amanuensis.server:run"
[tool.black] [tool.black]