Incorporate server and cli into new code #13
|
@ -1,72 +1,50 @@
|
|||
#
|
||||
# The cli module must not import other parts of the application at the module
|
||||
# level. This is because most other modules depend on the config module. The
|
||||
# config module may depend on __main__'s commandline parsing to locate config
|
||||
# files, and __main__'s commandline parsing requires importing (but not
|
||||
# executing) the functions in the cli module. Thus, cli functions must only
|
||||
# import the config module inside the various command methods, which are only
|
||||
# run after commandline parsing has already occurred.
|
||||
#
|
||||
from argparse import ArgumentParser
|
||||
|
||||
|
||||
def server_commands(commands={}):
|
||||
if commands:
|
||||
return commands
|
||||
import amanuensis.cli.server
|
||||
for name, func in vars(amanuensis.cli.server).items():
|
||||
def add_subcommand(subparsers, module) -> None:
|
||||
"""Add a cli submodule's commands as a subparser."""
|
||||
# Get the command information from the module
|
||||
command_name: str = getattr(module, "COMMAND_NAME")
|
||||
command_help: str = getattr(module, "COMMAND_HELP")
|
||||
if not command_name and command_help:
|
||||
return
|
||||
|
||||
# Add the subparser for the command and set a default action
|
||||
command_parser: ArgumentParser = subparsers.add_parser(
|
||||
command_name, help=command_help
|
||||
)
|
||||
command_parser.set_defaults(func=lambda args: command_parser.print_usage())
|
||||
|
||||
# Add all subcommands in the command module
|
||||
subcommands = command_parser.add_subparsers(metavar="SUBCOMMAND")
|
||||
for name, obj in vars(module).items():
|
||||
if name.startswith("command_"):
|
||||
name = name[8:].replace("_", "-")
|
||||
commands[name] = func
|
||||
return commands
|
||||
# Hyphenate subcommand names
|
||||
sc_name: str = name[8:].replace("_", "-")
|
||||
# Only the first line of the subcommand function docstring is used
|
||||
sc_help = ((obj.__doc__ or "").strip() or "\n").splitlines()[0]
|
||||
|
||||
# Add the command and any arguments defined by its decorators
|
||||
subcommand: ArgumentParser = subcommands.add_parser(
|
||||
sc_name, help=sc_help, description=obj.__doc__
|
||||
)
|
||||
subcommand.set_defaults(func=obj)
|
||||
for args, kwargs in obj.__dict__.get("add_argument", []):
|
||||
subcommand.add_argument(*args, **kwargs)
|
||||
|
||||
|
||||
def lexicon_commands(commands={}):
|
||||
if commands:
|
||||
return commands
|
||||
import amanuensis.cli.lexicon
|
||||
for name, func in vars(amanuensis.cli.lexicon).items():
|
||||
if name.startswith("command_"):
|
||||
name = name[8:].replace("_", "-")
|
||||
commands["lexicon-" + name] = func
|
||||
return commands
|
||||
def main():
|
||||
"""CLI entry point"""
|
||||
# Set up the top-level parser
|
||||
parser = ArgumentParser()
|
||||
parser.set_defaults(
|
||||
parser=parser,
|
||||
func=lambda args: parser.print_usage(),
|
||||
)
|
||||
|
||||
# Add commands from cli submodules
|
||||
subparsers = parser.add_subparsers(metavar="COMMAND")
|
||||
|
||||
def user_commands(commands={}):
|
||||
if commands:
|
||||
return commands
|
||||
import amanuensis.cli.user
|
||||
for name, func in vars(amanuensis.cli.user).items():
|
||||
if name.startswith("command_"):
|
||||
name = name[8:].replace("_", "-")
|
||||
commands["user-" + name] = func
|
||||
return commands
|
||||
|
||||
|
||||
def get_commands():
|
||||
return {**server_commands(), **lexicon_commands(), **user_commands()}
|
||||
|
||||
|
||||
def cmd_desc(func):
|
||||
return ((func.__doc__ or "").strip() or '\n').splitlines()[0]
|
||||
|
||||
|
||||
def describe_commands():
|
||||
longest = max(map(len, server_commands().keys()))
|
||||
server_desc = "General commands:\n{}\n".format("\n".join([
|
||||
" {1:<{0}} : {2}".format(longest, name, cmd_desc(func))
|
||||
for name, func in server_commands().items()
|
||||
]))
|
||||
|
||||
longest = max(map(len, lexicon_commands().keys()))
|
||||
lexicon_desc = "Lexicon commands:\n{}\n".format("\n".join([
|
||||
" {1:<{0}} : {2}".format(longest, name, cmd_desc(func))
|
||||
for name, func in lexicon_commands().items()
|
||||
]))
|
||||
|
||||
longest = max(map(len, user_commands().keys()))
|
||||
user_desc = "User commands:\n{}\n".format("\n".join([
|
||||
" {1:<{0}} : {2}".format(longest, name, cmd_desc(func))
|
||||
for name, func in user_commands().items()
|
||||
]))
|
||||
|
||||
return "\n".join([server_desc, lexicon_desc, user_desc])
|
||||
# Parse args and execute the desired action
|
||||
args = parser.parse_args()
|
||||
args.func(args)
|
||||
|
|
|
@ -1,209 +1,20 @@
|
|||
# Standard library imports
|
||||
from argparse import ArgumentParser
|
||||
from functools import wraps
|
||||
from json.decoder import JSONDecodeError
|
||||
from logging import getLogger
|
||||
from sys import exc_info
|
||||
|
||||
logger = getLogger(__name__)
|
||||
|
||||
|
||||
#
|
||||
# The add_argument and no_argument function wrappers allow the same
|
||||
# function to both configure a command and execute it. This keeps
|
||||
# command argument configuration close to where the command is defined
|
||||
# and reduces the number of things the main parser has to handle.
|
||||
#
|
||||
"""
|
||||
Helpers for cli commands.
|
||||
"""
|
||||
|
||||
|
||||
def add_argument(*args, **kwargs):
|
||||
"""Passes the given args and kwargs to subparser.add_argument"""
|
||||
"""Defines an argument to a cli command."""
|
||||
|
||||
def argument_adder(command):
|
||||
@wraps(command)
|
||||
def augmented_command(cmd_args):
|
||||
# Add this wrapper's command in the parser pass
|
||||
if isinstance(cmd_args, ArgumentParser):
|
||||
cmd_args.add_argument(*args, **kwargs)
|
||||
# If there are more command wrappers, pass through to them
|
||||
if command.__dict__.get('wrapper', False):
|
||||
command(cmd_args)
|
||||
# Parser pass doesn't return a value
|
||||
return None
|
||||
def argument_adder(command_func):
|
||||
"""Decorator function for storing parser args on the function."""
|
||||
|
||||
# Pass through transparently in the execute pass
|
||||
return command(cmd_args)
|
||||
# Store the kw/args in the function dictionary
|
||||
add_args = command_func.__dict__.get("add_argument", [])
|
||||
add_args.append((args, kwargs))
|
||||
command_func.__dict__["add_argument"] = add_args
|
||||
|
||||
# Mark the command as wrapped so control passes through
|
||||
augmented_command.__dict__['wrapper'] = True
|
||||
return augmented_command
|
||||
# Return the same function
|
||||
return command_func
|
||||
|
||||
return argument_adder
|
||||
|
||||
|
||||
def no_argument(command):
|
||||
"""Noops for subparsers"""
|
||||
@wraps(command)
|
||||
def augmented_command(cmd_args):
|
||||
# Noop in the parser pass
|
||||
if isinstance(cmd_args, ArgumentParser):
|
||||
return None
|
||||
# Pass through in the execute pass
|
||||
return command(cmd_args)
|
||||
|
||||
return augmented_command
|
||||
|
||||
|
||||
#
|
||||
# Many commands require specifying a lexicon or user to operate on, so
|
||||
# the requires_lexicon and requires_user wrappers replace @add_argument
|
||||
# as well as automatically create the model for the object from the
|
||||
# provided identifier.
|
||||
#
|
||||
|
||||
|
||||
LEXICON_ARGS = ['--lexicon']
|
||||
LEXICON_KWARGS = {
|
||||
'metavar': 'LEXICON',
|
||||
'dest': 'lexicon',
|
||||
'help': 'Specify a user to operate on'}
|
||||
|
||||
|
||||
def requires_lexicon(command):
|
||||
@wraps(command)
|
||||
def augmented_command(cmd_args):
|
||||
# Add lexicon argument in parser pass
|
||||
if isinstance(cmd_args, ArgumentParser):
|
||||
cmd_args.add_argument(*LEXICON_ARGS, **LEXICON_KWARGS)
|
||||
# If there are more command wrappers, pass through to them
|
||||
if command.__dict__.get('wrapper', False):
|
||||
command(cmd_args)
|
||||
# Parser pass doesn't return a value
|
||||
return None
|
||||
|
||||
# Verify lexicon argument in execute pass
|
||||
val = getattr(cmd_args, 'lexicon', None)
|
||||
if not val:
|
||||
logger.error("Missing --lexicon argument")
|
||||
return -1
|
||||
try:
|
||||
model_factory = cmd_args.model_factory
|
||||
cmd_args.lexicon = model_factory.lexicon(val)
|
||||
except Exception:
|
||||
ex_type, value, tb = exc_info()
|
||||
logger.error(
|
||||
f'Loading lexicon "{val}" failed with '
|
||||
f'{ex_type.__name__}: {value}')
|
||||
return -1
|
||||
return command(cmd_args)
|
||||
|
||||
augmented_command.__dict__['wrapper'] = True
|
||||
return augmented_command
|
||||
|
||||
|
||||
USER_ARGS = ['--user']
|
||||
USER_KWARGS = {
|
||||
'metavar': 'USER',
|
||||
'dest': 'user',
|
||||
'help': 'Specify a user to operate on'}
|
||||
|
||||
|
||||
def requires_user(command):
|
||||
@wraps(command)
|
||||
def augmented_command(cmd_args):
|
||||
# Add user argument in parser pass
|
||||
if isinstance(cmd_args, ArgumentParser):
|
||||
cmd_args.add_argument(*USER_ARGS, **USER_KWARGS)
|
||||
# If there are more command wrappers, pass through to them
|
||||
if command.__dict__.get('wrapper', False):
|
||||
command(cmd_args)
|
||||
# Parser pass doesn't return a value
|
||||
return None
|
||||
|
||||
# Verify user argument in execute pass
|
||||
val = getattr(cmd_args, "user", None)
|
||||
if not val:
|
||||
logger.error("Missing --user argument")
|
||||
return -1
|
||||
try:
|
||||
model_factory = cmd_args.model_factory
|
||||
cmd_args.user = model_factory.user(val)
|
||||
except Exception:
|
||||
ex_type, value, tb = exc_info()
|
||||
logger.error(
|
||||
f'Loading user "{val}" failed with '
|
||||
f'{ex_type.__name__}: {value}')
|
||||
return -1
|
||||
return command(cmd_args)
|
||||
|
||||
augmented_command.__dict__['wrapper'] = True
|
||||
return augmented_command
|
||||
|
||||
|
||||
# Wrapper for aliasing commands
|
||||
def alias(cmd_alias):
|
||||
"""Adds an alias to the function dictionary"""
|
||||
def aliaser(command):
|
||||
aliases = command.__dict__.get('aliases', [])
|
||||
aliases.append(cmd_alias)
|
||||
command.__dict__['aliases'] = aliases
|
||||
return command
|
||||
return aliaser
|
||||
|
||||
|
||||
# Helpers for common command tasks
|
||||
|
||||
CONFIG_GET_ROOT_VALUE = object()
|
||||
|
||||
|
||||
def config_get(cfg, pathspec):
|
||||
"""
|
||||
Performs config --get for a given config
|
||||
|
||||
cfg is from a `with json_ro` context
|
||||
path is the full pathspec, unsplit
|
||||
"""
|
||||
import json
|
||||
|
||||
if pathspec is CONFIG_GET_ROOT_VALUE:
|
||||
path = []
|
||||
else:
|
||||
path = pathspec.split(".")
|
||||
for spec in path:
|
||||
if spec not in cfg:
|
||||
logger.error("Path not found: {}".format(pathspec))
|
||||
return -1
|
||||
cfg = cfg.get(spec)
|
||||
print(json.dumps(cfg, indent=2))
|
||||
return 0
|
||||
|
||||
|
||||
def config_set(obj_id, cfg, set_tuple):
|
||||
"""
|
||||
Performs config --set for a given config
|
||||
|
||||
config is from a "with json_rw" context
|
||||
set_tuple is a tuple of the pathspec and the value
|
||||
"""
|
||||
import json
|
||||
pathspec, value = set_tuple
|
||||
if not pathspec:
|
||||
logger.error("Path must be non-empty")
|
||||
path = pathspec.split('.')
|
||||
try:
|
||||
value = json.loads(value)
|
||||
except JSONDecodeError:
|
||||
pass # Leave value as string
|
||||
for spec in path[:-1]:
|
||||
if spec not in cfg:
|
||||
logger.error("Path not found")
|
||||
return -1
|
||||
cfg = cfg.get(spec)
|
||||
key = path[-1]
|
||||
if key not in cfg:
|
||||
logger.error("Path not found")
|
||||
return -1
|
||||
old_value = cfg[key]
|
||||
cfg[key] = value
|
||||
logger.info("{}.{}: {} -> {}".format(obj_id, pathspec, old_value, value))
|
||||
return 0
|
||||
|
|
Loading…
Reference in New Issue