Compare commits
5 Commits
e23019bff6
...
324788f9a1
Author | SHA1 | Date |
---|---|---|
Tim Van Baak | 324788f9a1 | |
Tim Van Baak | 32a0948deb | |
Tim Van Baak | 64973c4873 | |
Tim Van Baak | 3890d4b11e | |
Tim Van Baak | f6c0b9509d |
|
@ -1,72 +1,94 @@
|
|||
#
|
||||
# The cli module must not import other parts of the application at the module
|
||||
# level. This is because most other modules depend on the config module. The
|
||||
# config module may depend on __main__'s commandline parsing to locate config
|
||||
# files, and __main__'s commandline parsing requires importing (but not
|
||||
# executing) the functions in the cli module. Thus, cli functions must only
|
||||
# import the config module inside the various command methods, which are only
|
||||
# run after commandline parsing has already occurred.
|
||||
#
|
||||
from argparse import ArgumentParser
|
||||
import logging
|
||||
import logging.config
|
||||
|
||||
import amanuensis.cli.admin
|
||||
|
||||
|
||||
def server_commands(commands={}):
|
||||
if commands:
|
||||
return commands
|
||||
import amanuensis.cli.server
|
||||
for name, func in vars(amanuensis.cli.server).items():
|
||||
LOGGING_CONFIG = {
|
||||
"version": 1,
|
||||
"formatters": {
|
||||
"fmt_basic": {
|
||||
"validate": True,
|
||||
"format": "%(message)s",
|
||||
},
|
||||
"fmt_detailed": {
|
||||
"validate": True,
|
||||
"format": "%(asctime)s %(levelname)s %(message)s"
|
||||
},
|
||||
},
|
||||
"handlers": {
|
||||
"hnd_stderr": {
|
||||
"class": "logging.StreamHandler",
|
||||
"level": "INFO",
|
||||
"formatter": "fmt_basic",
|
||||
},
|
||||
},
|
||||
"loggers": {
|
||||
__name__: {
|
||||
"level": "DEBUG",
|
||||
"handlers": ["hnd_stderr"]
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def add_subcommand(subparsers, module) -> None:
|
||||
"""Add a cli submodule's commands as a subparser."""
|
||||
# Get the command information from the module
|
||||
command_name: str = getattr(module, "COMMAND_NAME")
|
||||
command_help: str = getattr(module, "COMMAND_HELP")
|
||||
if not command_name and command_help:
|
||||
return
|
||||
|
||||
# Add the subparser for the command and set a default action
|
||||
command_parser: ArgumentParser = subparsers.add_parser(
|
||||
command_name, help=command_help
|
||||
)
|
||||
command_parser.set_defaults(func=lambda args: command_parser.print_usage())
|
||||
|
||||
# Add all subcommands in the command module
|
||||
subcommands = command_parser.add_subparsers(metavar="SUBCOMMAND")
|
||||
for name, obj in vars(module).items():
|
||||
if name.startswith("command_"):
|
||||
name = name[8:].replace("_", "-")
|
||||
commands[name] = func
|
||||
return commands
|
||||
# Hyphenate subcommand names
|
||||
sc_name: str = name[8:].replace("_", "-")
|
||||
# Only the first line of the subcommand function docstring is used
|
||||
sc_help = ((obj.__doc__ or "").strip() or "\n").splitlines()[0]
|
||||
|
||||
# Add the command and any arguments defined by its decorators
|
||||
subcommand: ArgumentParser = subcommands.add_parser(
|
||||
sc_name, help=sc_help, description=obj.__doc__
|
||||
)
|
||||
subcommand.set_defaults(func=obj)
|
||||
for args, kwargs in obj.__dict__.get("add_argument", []):
|
||||
subcommand.add_argument(*args, **kwargs)
|
||||
|
||||
|
||||
def lexicon_commands(commands={}):
|
||||
if commands:
|
||||
return commands
|
||||
import amanuensis.cli.lexicon
|
||||
for name, func in vars(amanuensis.cli.lexicon).items():
|
||||
if name.startswith("command_"):
|
||||
name = name[8:].replace("_", "-")
|
||||
commands["lexicon-" + name] = func
|
||||
return commands
|
||||
def init_logger(args):
|
||||
"""Set up logging based on verbosity args"""
|
||||
if (args.verbose):
|
||||
handler = LOGGING_CONFIG["handlers"]["hnd_stderr"]
|
||||
handler["formatter"] = "fmt_detailed"
|
||||
handler["level"] = "DEBUG"
|
||||
logging.config.dictConfig(LOGGING_CONFIG)
|
||||
|
||||
|
||||
def user_commands(commands={}):
|
||||
if commands:
|
||||
return commands
|
||||
import amanuensis.cli.user
|
||||
for name, func in vars(amanuensis.cli.user).items():
|
||||
if name.startswith("command_"):
|
||||
name = name[8:].replace("_", "-")
|
||||
commands["user-" + name] = func
|
||||
return commands
|
||||
def main():
|
||||
"""CLI entry point"""
|
||||
# Set up the top-level parser
|
||||
parser = ArgumentParser()
|
||||
parser.set_defaults(
|
||||
parser=parser,
|
||||
func=lambda args: parser.print_usage(),
|
||||
)
|
||||
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
|
||||
|
||||
# Add commands from cli submodules
|
||||
subparsers = parser.add_subparsers(metavar="COMMAND")
|
||||
add_subcommand(subparsers, amanuensis.cli.admin)
|
||||
|
||||
def get_commands():
|
||||
return {**server_commands(), **lexicon_commands(), **user_commands()}
|
||||
|
||||
|
||||
def cmd_desc(func):
|
||||
return ((func.__doc__ or "").strip() or '\n').splitlines()[0]
|
||||
|
||||
|
||||
def describe_commands():
|
||||
longest = max(map(len, server_commands().keys()))
|
||||
server_desc = "General commands:\n{}\n".format("\n".join([
|
||||
" {1:<{0}} : {2}".format(longest, name, cmd_desc(func))
|
||||
for name, func in server_commands().items()
|
||||
]))
|
||||
|
||||
longest = max(map(len, lexicon_commands().keys()))
|
||||
lexicon_desc = "Lexicon commands:\n{}\n".format("\n".join([
|
||||
" {1:<{0}} : {2}".format(longest, name, cmd_desc(func))
|
||||
for name, func in lexicon_commands().items()
|
||||
]))
|
||||
|
||||
longest = max(map(len, user_commands().keys()))
|
||||
user_desc = "User commands:\n{}\n".format("\n".join([
|
||||
" {1:<{0}} : {2}".format(longest, name, cmd_desc(func))
|
||||
for name, func in user_commands().items()
|
||||
]))
|
||||
|
||||
return "\n".join([server_desc, lexicon_desc, user_desc])
|
||||
# Parse args and execute the desired action
|
||||
args = parser.parse_args()
|
||||
init_logger(args)
|
||||
args.func(args)
|
||||
|
|
|
@ -0,0 +1,58 @@
|
|||
import collections
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
|
||||
from amanuensis.db import DbContext
|
||||
|
||||
from .helpers import add_argument
|
||||
|
||||
|
||||
COMMAND_NAME = "admin"
|
||||
COMMAND_HELP = "Interact with Amanuensis."
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@add_argument("path", metavar="DB_PATH", help="Path to where the database should be created")
|
||||
@add_argument("--force", "-f", action="store_true", help="Overwrite existing database")
|
||||
@add_argument("--verbose", "-v", action="store_true", help="Enable db echo")
|
||||
def command_init_db(args) -> int:
|
||||
"""
|
||||
Initialize the Amanuensis database.
|
||||
"""
|
||||
# Check if force is required
|
||||
if not args.force and os.path.exists(args.path):
|
||||
args.parser.error(f"{args.path} already exists and --force was not specified")
|
||||
|
||||
# Initialize the database
|
||||
db_uri = f"sqlite:///{os.path.abspath(args.path)}"
|
||||
LOG.info(f"Creating database at {db_uri}")
|
||||
db = DbContext(db_uri, debug=args.verbose)
|
||||
db.create_all()
|
||||
|
||||
LOG.info("Done")
|
||||
return 0
|
||||
|
||||
|
||||
@add_argument("path", metavar="CONFIG_PATH", help="Path to the config file")
|
||||
def command_secret_key(args) -> int:
|
||||
"""
|
||||
Generate a Flask secret key.
|
||||
|
||||
The Flask server will not run unless a secret key has
|
||||
been generated.
|
||||
"""
|
||||
# Load the json config
|
||||
with open(args.path, mode="r", encoding="utf8") as f:
|
||||
config = json.load(f, object_pairs_hook=collections.OrderedDict)
|
||||
|
||||
# Set the secret key to a new random string
|
||||
config["SECRET_KEY"] = os.urandom(32).hex()
|
||||
|
||||
# Write the config back out
|
||||
with open(args.path, mode="w", encoding="utf8") as f:
|
||||
json.dump(config, f, indent=2)
|
||||
|
||||
LOG.info("Regenerated Flask secret key")
|
||||
return 0
|
|
@ -1,209 +1,20 @@
|
|||
# Standard library imports
|
||||
from argparse import ArgumentParser
|
||||
from functools import wraps
|
||||
from json.decoder import JSONDecodeError
|
||||
from logging import getLogger
|
||||
from sys import exc_info
|
||||
|
||||
logger = getLogger(__name__)
|
||||
|
||||
|
||||
#
|
||||
# The add_argument and no_argument function wrappers allow the same
|
||||
# function to both configure a command and execute it. This keeps
|
||||
# command argument configuration close to where the command is defined
|
||||
# and reduces the number of things the main parser has to handle.
|
||||
#
|
||||
"""
|
||||
Helpers for cli commands.
|
||||
"""
|
||||
|
||||
|
||||
def add_argument(*args, **kwargs):
|
||||
"""Passes the given args and kwargs to subparser.add_argument"""
|
||||
"""Defines an argument to a cli command."""
|
||||
|
||||
def argument_adder(command):
|
||||
@wraps(command)
|
||||
def augmented_command(cmd_args):
|
||||
# Add this wrapper's command in the parser pass
|
||||
if isinstance(cmd_args, ArgumentParser):
|
||||
cmd_args.add_argument(*args, **kwargs)
|
||||
# If there are more command wrappers, pass through to them
|
||||
if command.__dict__.get('wrapper', False):
|
||||
command(cmd_args)
|
||||
# Parser pass doesn't return a value
|
||||
return None
|
||||
def argument_adder(command_func):
|
||||
"""Decorator function for storing parser args on the function."""
|
||||
|
||||
# Pass through transparently in the execute pass
|
||||
return command(cmd_args)
|
||||
# Store the kw/args in the function dictionary
|
||||
add_args = command_func.__dict__.get("add_argument", [])
|
||||
add_args.append((args, kwargs))
|
||||
command_func.__dict__["add_argument"] = add_args
|
||||
|
||||
# Mark the command as wrapped so control passes through
|
||||
augmented_command.__dict__['wrapper'] = True
|
||||
return augmented_command
|
||||
# Return the same function
|
||||
return command_func
|
||||
|
||||
return argument_adder
|
||||
|
||||
|
||||
def no_argument(command):
|
||||
"""Noops for subparsers"""
|
||||
@wraps(command)
|
||||
def augmented_command(cmd_args):
|
||||
# Noop in the parser pass
|
||||
if isinstance(cmd_args, ArgumentParser):
|
||||
return None
|
||||
# Pass through in the execute pass
|
||||
return command(cmd_args)
|
||||
|
||||
return augmented_command
|
||||
|
||||
|
||||
#
|
||||
# Many commands require specifying a lexicon or user to operate on, so
|
||||
# the requires_lexicon and requires_user wrappers replace @add_argument
|
||||
# as well as automatically create the model for the object from the
|
||||
# provided identifier.
|
||||
#
|
||||
|
||||
|
||||
LEXICON_ARGS = ['--lexicon']
|
||||
LEXICON_KWARGS = {
|
||||
'metavar': 'LEXICON',
|
||||
'dest': 'lexicon',
|
||||
'help': 'Specify a user to operate on'}
|
||||
|
||||
|
||||
def requires_lexicon(command):
|
||||
@wraps(command)
|
||||
def augmented_command(cmd_args):
|
||||
# Add lexicon argument in parser pass
|
||||
if isinstance(cmd_args, ArgumentParser):
|
||||
cmd_args.add_argument(*LEXICON_ARGS, **LEXICON_KWARGS)
|
||||
# If there are more command wrappers, pass through to them
|
||||
if command.__dict__.get('wrapper', False):
|
||||
command(cmd_args)
|
||||
# Parser pass doesn't return a value
|
||||
return None
|
||||
|
||||
# Verify lexicon argument in execute pass
|
||||
val = getattr(cmd_args, 'lexicon', None)
|
||||
if not val:
|
||||
logger.error("Missing --lexicon argument")
|
||||
return -1
|
||||
try:
|
||||
model_factory = cmd_args.model_factory
|
||||
cmd_args.lexicon = model_factory.lexicon(val)
|
||||
except Exception:
|
||||
ex_type, value, tb = exc_info()
|
||||
logger.error(
|
||||
f'Loading lexicon "{val}" failed with '
|
||||
f'{ex_type.__name__}: {value}')
|
||||
return -1
|
||||
return command(cmd_args)
|
||||
|
||||
augmented_command.__dict__['wrapper'] = True
|
||||
return augmented_command
|
||||
|
||||
|
||||
USER_ARGS = ['--user']
|
||||
USER_KWARGS = {
|
||||
'metavar': 'USER',
|
||||
'dest': 'user',
|
||||
'help': 'Specify a user to operate on'}
|
||||
|
||||
|
||||
def requires_user(command):
|
||||
@wraps(command)
|
||||
def augmented_command(cmd_args):
|
||||
# Add user argument in parser pass
|
||||
if isinstance(cmd_args, ArgumentParser):
|
||||
cmd_args.add_argument(*USER_ARGS, **USER_KWARGS)
|
||||
# If there are more command wrappers, pass through to them
|
||||
if command.__dict__.get('wrapper', False):
|
||||
command(cmd_args)
|
||||
# Parser pass doesn't return a value
|
||||
return None
|
||||
|
||||
# Verify user argument in execute pass
|
||||
val = getattr(cmd_args, "user", None)
|
||||
if not val:
|
||||
logger.error("Missing --user argument")
|
||||
return -1
|
||||
try:
|
||||
model_factory = cmd_args.model_factory
|
||||
cmd_args.user = model_factory.user(val)
|
||||
except Exception:
|
||||
ex_type, value, tb = exc_info()
|
||||
logger.error(
|
||||
f'Loading user "{val}" failed with '
|
||||
f'{ex_type.__name__}: {value}')
|
||||
return -1
|
||||
return command(cmd_args)
|
||||
|
||||
augmented_command.__dict__['wrapper'] = True
|
||||
return augmented_command
|
||||
|
||||
|
||||
# Wrapper for aliasing commands
|
||||
def alias(cmd_alias):
|
||||
"""Adds an alias to the function dictionary"""
|
||||
def aliaser(command):
|
||||
aliases = command.__dict__.get('aliases', [])
|
||||
aliases.append(cmd_alias)
|
||||
command.__dict__['aliases'] = aliases
|
||||
return command
|
||||
return aliaser
|
||||
|
||||
|
||||
# Helpers for common command tasks
|
||||
|
||||
CONFIG_GET_ROOT_VALUE = object()
|
||||
|
||||
|
||||
def config_get(cfg, pathspec):
|
||||
"""
|
||||
Performs config --get for a given config
|
||||
|
||||
cfg is from a `with json_ro` context
|
||||
path is the full pathspec, unsplit
|
||||
"""
|
||||
import json
|
||||
|
||||
if pathspec is CONFIG_GET_ROOT_VALUE:
|
||||
path = []
|
||||
else:
|
||||
path = pathspec.split(".")
|
||||
for spec in path:
|
||||
if spec not in cfg:
|
||||
logger.error("Path not found: {}".format(pathspec))
|
||||
return -1
|
||||
cfg = cfg.get(spec)
|
||||
print(json.dumps(cfg, indent=2))
|
||||
return 0
|
||||
|
||||
|
||||
def config_set(obj_id, cfg, set_tuple):
|
||||
"""
|
||||
Performs config --set for a given config
|
||||
|
||||
config is from a "with json_rw" context
|
||||
set_tuple is a tuple of the pathspec and the value
|
||||
"""
|
||||
import json
|
||||
pathspec, value = set_tuple
|
||||
if not pathspec:
|
||||
logger.error("Path must be non-empty")
|
||||
path = pathspec.split('.')
|
||||
try:
|
||||
value = json.loads(value)
|
||||
except JSONDecodeError:
|
||||
pass # Leave value as string
|
||||
for spec in path[:-1]:
|
||||
if spec not in cfg:
|
||||
logger.error("Path not found")
|
||||
return -1
|
||||
cfg = cfg.get(spec)
|
||||
key = path[-1]
|
||||
if key not in cfg:
|
||||
logger.error("Path not found")
|
||||
return -1
|
||||
old_value = cfg[key]
|
||||
cfg[key] = value
|
||||
logger.info("{}.{}: {} -> {}".format(obj_id, pathspec, old_value, value))
|
||||
return 0
|
||||
|
|
|
@ -1,120 +0,0 @@
|
|||
import logging
|
||||
import os
|
||||
|
||||
from amanuensis.config import RootConfigDirectoryContext
|
||||
|
||||
from .helpers import (
|
||||
add_argument,
|
||||
no_argument,
|
||||
alias,
|
||||
config_get,
|
||||
config_set,
|
||||
CONFIG_GET_ROOT_VALUE)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@alias('i')
|
||||
@add_argument("--refresh",
|
||||
action="store_true",
|
||||
help="Refresh an existing config directory")
|
||||
def command_init(args):
|
||||
"""
|
||||
Initialize a config directory at --config-dir
|
||||
|
||||
A clean config directory will contain a config.json, a
|
||||
lexicon config directory, and a user config directory.
|
||||
|
||||
Refreshing an existing directory will add keys to the global config that
|
||||
are present in the default configs. Users and lexicons that are missing
|
||||
from the indexes will be deleted, and stale index entries will be removed.
|
||||
"""
|
||||
# Module imports
|
||||
from amanuensis.config.init import create_config_dir
|
||||
|
||||
# Verify arguments
|
||||
if args.refresh and not os.path.isdir(args.config_dir):
|
||||
print("Error: couldn't find directory '{}'".format(args.config_dir))
|
||||
|
||||
# Internal call
|
||||
create_config_dir(args.config_dir, args.refresh)
|
||||
logger.info(f'Initialized config dir at {args.config_dir}')
|
||||
return 0
|
||||
|
||||
|
||||
@alias('gs')
|
||||
@no_argument
|
||||
def command_generate_secret(args):
|
||||
"""
|
||||
Generate a Flask secret key
|
||||
|
||||
The Flask server will not run unless a secret key has
|
||||
been generated.
|
||||
"""
|
||||
root: RootConfigDirectoryContext = args.root
|
||||
secret_key: bytes = os.urandom(32)
|
||||
with root.edit_config() as cfg:
|
||||
cfg.secret_key = secret_key.hex()
|
||||
logger.info("Regenerated Flask secret key")
|
||||
return 0
|
||||
|
||||
|
||||
@alias('r')
|
||||
@add_argument("-a", "--address", default="127.0.0.1")
|
||||
@add_argument("-p", "--port", default="5000")
|
||||
@add_argument("--debug", action="store_true")
|
||||
def command_run(args):
|
||||
"""
|
||||
Run the default Flask server
|
||||
|
||||
The default Flask server is not secure, and should
|
||||
only be used for development.
|
||||
"""
|
||||
from amanuensis.server import get_app
|
||||
|
||||
root: RootConfigDirectoryContext = args.root
|
||||
|
||||
with root.read_config() as cfg:
|
||||
if cfg.secret_key is None:
|
||||
logger.error("Can't run server without a secret_key. "
|
||||
"Run generate-secet first.")
|
||||
return -1
|
||||
|
||||
get_app(root).run(host=args.address, port=args.port, debug=args.debug)
|
||||
return 0
|
||||
|
||||
|
||||
@alias('n')
|
||||
@add_argument("--get",
|
||||
metavar="PATHSPEC",
|
||||
dest="get",
|
||||
nargs="?",
|
||||
const=CONFIG_GET_ROOT_VALUE,
|
||||
help="Get the value of a config key")
|
||||
@add_argument("--set",
|
||||
metavar=("PATHSPEC", "VALUE"),
|
||||
dest="set",
|
||||
nargs=2,
|
||||
help="Set the value of a config key")
|
||||
def command_config(args):
|
||||
"""
|
||||
Interact with the global config
|
||||
|
||||
PATHSPEC is a path into the config object formatted as
|
||||
a dot-separated sequence of keys.
|
||||
"""
|
||||
root: RootConfigDirectoryContext = args.root
|
||||
|
||||
if args.get and args.set:
|
||||
logger.error("Specify one of --get and --set")
|
||||
return -1
|
||||
|
||||
if args.get:
|
||||
with root.read_config() as cfg:
|
||||
config_get(cfg, args.get)
|
||||
|
||||
if args.set:
|
||||
with root.edit_config() as cfg:
|
||||
config_set("config", cfg, args.set)
|
||||
|
||||
return 0
|
|
@ -1,5 +0,0 @@
|
|||
from .setup import init_logging
|
||||
|
||||
__all__ = [member.__name__ for member in [
|
||||
init_logging
|
||||
]]
|
|
@ -1,45 +0,0 @@
|
|||
import logging
|
||||
import logging.handlers
|
||||
|
||||
|
||||
basic_formatter = logging.Formatter(
|
||||
fmt='[{levelname}] {message}',
|
||||
style='{')
|
||||
detailed_formatter = logging.Formatter(
|
||||
fmt='[{asctime}] [{levelname}:{filename}:{lineno}] {message}',
|
||||
style='{')
|
||||
basic_console_handler = logging.StreamHandler()
|
||||
basic_console_handler.setLevel(logging.INFO)
|
||||
basic_console_handler.setFormatter(basic_formatter)
|
||||
detailed_console_handler = logging.StreamHandler()
|
||||
detailed_console_handler.setLevel(logging.DEBUG)
|
||||
detailed_console_handler.setFormatter(detailed_formatter)
|
||||
|
||||
|
||||
def get_file_handler(filename: str) -> logging.Handler:
|
||||
handler = logging.handlers.RotatingFileHandler(
|
||||
filename=filename,
|
||||
maxBytes=1000000,
|
||||
backupCount=10,
|
||||
delay=True,
|
||||
encoding='utf8',
|
||||
)
|
||||
handler.setLevel(logging.DEBUG)
|
||||
handler.setFormatter(detailed_formatter)
|
||||
return handler
|
||||
|
||||
|
||||
def init_logging(verbose: bool, log_filename: str):
|
||||
"""
|
||||
Initializes the Amanuensis logger settings
|
||||
"""
|
||||
logger = logging.getLogger("amanuensis")
|
||||
if log_filename:
|
||||
logger.addHandler(get_file_handler(log_filename))
|
||||
logger.setLevel(logging.DEBUG)
|
||||
elif verbose:
|
||||
logger.addHandler(detailed_console_handler)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
else:
|
||||
logger.addHandler(basic_console_handler)
|
||||
logger.setLevel(logging.INFO)
|
|
@ -17,6 +17,7 @@ black = "^21.5b2"
|
|||
mypy = "^0.812"
|
||||
|
||||
[tool.poetry.scripts]
|
||||
amanuensis-cli = "amanuensis.cli:main"
|
||||
amanuensis-server = "amanuensis.server:run"
|
||||
|
||||
[tool.black]
|
||||
|
|
Loading…
Reference in New Issue