From f6c0b9509da44c5280855f0d660a81d38b423ac7 Mon Sep 17 00:00:00 2001 From: Tim Van Baak Date: Tue, 15 Jun 2021 11:53:28 -0700 Subject: [PATCH] Refactor top-level CLI parser --- amanuensis/cli/__init__.py | 110 ++++++++----------- amanuensis/cli/helpers.py | 215 +++---------------------------------- 2 files changed, 57 insertions(+), 268 deletions(-) diff --git a/amanuensis/cli/__init__.py b/amanuensis/cli/__init__.py index 6723b18..99f9c58 100644 --- a/amanuensis/cli/__init__.py +++ b/amanuensis/cli/__init__.py @@ -1,72 +1,50 @@ -# -# The cli module must not import other parts of the application at the module -# level. This is because most other modules depend on the config module. The -# config module may depend on __main__'s commandline parsing to locate config -# files, and __main__'s commandline parsing requires importing (but not -# executing) the functions in the cli module. Thus, cli functions must only -# import the config module inside the various command methods, which are only -# run after commandline parsing has already occurred. -# +from argparse import ArgumentParser -def server_commands(commands={}): - if commands: - return commands - import amanuensis.cli.server - for name, func in vars(amanuensis.cli.server).items(): - if name.startswith("command_"): - name = name[8:].replace("_", "-") - commands[name] = func - return commands +def add_subcommand(subparsers, module) -> None: + """Add a cli submodule's commands as a subparser.""" + # Get the command information from the module + command_name: str = getattr(module, "COMMAND_NAME") + command_help: str = getattr(module, "COMMAND_HELP") + if not command_name and command_help: + return + + # Add the subparser for the command and set a default action + command_parser: ArgumentParser = subparsers.add_parser( + command_name, help=command_help + ) + command_parser.set_defaults(func=lambda args: command_parser.print_usage()) + + # Add all subcommands in the command module + subcommands = command_parser.add_subparsers(metavar="SUBCOMMAND") + for name, obj in vars(module).items(): + if name.startswith("command_"): + # Hyphenate subcommand names + sc_name: str = name[8:].replace("_", "-") + # Only the first line of the subcommand function docstring is used + sc_help = ((obj.__doc__ or "").strip() or "\n").splitlines()[0] + + # Add the command and any arguments defined by its decorators + subcommand: ArgumentParser = subcommands.add_parser( + sc_name, help=sc_help, description=obj.__doc__ + ) + subcommand.set_defaults(func=obj) + for args, kwargs in obj.__dict__.get("add_argument", []): + subcommand.add_argument(*args, **kwargs) -def lexicon_commands(commands={}): - if commands: - return commands - import amanuensis.cli.lexicon - for name, func in vars(amanuensis.cli.lexicon).items(): - if name.startswith("command_"): - name = name[8:].replace("_", "-") - commands["lexicon-" + name] = func - return commands +def main(): + """CLI entry point""" + # Set up the top-level parser + parser = ArgumentParser() + parser.set_defaults( + parser=parser, + func=lambda args: parser.print_usage(), + ) + # Add commands from cli submodules + subparsers = parser.add_subparsers(metavar="COMMAND") -def user_commands(commands={}): - if commands: - return commands - import amanuensis.cli.user - for name, func in vars(amanuensis.cli.user).items(): - if name.startswith("command_"): - name = name[8:].replace("_", "-") - commands["user-" + name] = func - return commands - - -def get_commands(): - return {**server_commands(), **lexicon_commands(), **user_commands()} - - -def cmd_desc(func): - return ((func.__doc__ or "").strip() or '\n').splitlines()[0] - - -def describe_commands(): - longest = max(map(len, server_commands().keys())) - server_desc = "General commands:\n{}\n".format("\n".join([ - " {1:<{0}} : {2}".format(longest, name, cmd_desc(func)) - for name, func in server_commands().items() - ])) - - longest = max(map(len, lexicon_commands().keys())) - lexicon_desc = "Lexicon commands:\n{}\n".format("\n".join([ - " {1:<{0}} : {2}".format(longest, name, cmd_desc(func)) - for name, func in lexicon_commands().items() - ])) - - longest = max(map(len, user_commands().keys())) - user_desc = "User commands:\n{}\n".format("\n".join([ - " {1:<{0}} : {2}".format(longest, name, cmd_desc(func)) - for name, func in user_commands().items() - ])) - - return "\n".join([server_desc, lexicon_desc, user_desc]) + # Parse args and execute the desired action + args = parser.parse_args() + args.func(args) diff --git a/amanuensis/cli/helpers.py b/amanuensis/cli/helpers.py index 1d2a136..6bf3c31 100644 --- a/amanuensis/cli/helpers.py +++ b/amanuensis/cli/helpers.py @@ -1,209 +1,20 @@ -# Standard library imports -from argparse import ArgumentParser -from functools import wraps -from json.decoder import JSONDecodeError -from logging import getLogger -from sys import exc_info - -logger = getLogger(__name__) - - -# -# The add_argument and no_argument function wrappers allow the same -# function to both configure a command and execute it. This keeps -# command argument configuration close to where the command is defined -# and reduces the number of things the main parser has to handle. -# +""" +Helpers for cli commands. +""" def add_argument(*args, **kwargs): - """Passes the given args and kwargs to subparser.add_argument""" + """Defines an argument to a cli command.""" - def argument_adder(command): - @wraps(command) - def augmented_command(cmd_args): - # Add this wrapper's command in the parser pass - if isinstance(cmd_args, ArgumentParser): - cmd_args.add_argument(*args, **kwargs) - # If there are more command wrappers, pass through to them - if command.__dict__.get('wrapper', False): - command(cmd_args) - # Parser pass doesn't return a value - return None + def argument_adder(command_func): + """Decorator function for storing parser args on the function.""" - # Pass through transparently in the execute pass - return command(cmd_args) + # Store the kw/args in the function dictionary + add_args = command_func.__dict__.get("add_argument", []) + add_args.append((args, kwargs)) + command_func.__dict__["add_argument"] = add_args - # Mark the command as wrapped so control passes through - augmented_command.__dict__['wrapper'] = True - return augmented_command + # Return the same function + return command_func - return argument_adder - - -def no_argument(command): - """Noops for subparsers""" - @wraps(command) - def augmented_command(cmd_args): - # Noop in the parser pass - if isinstance(cmd_args, ArgumentParser): - return None - # Pass through in the execute pass - return command(cmd_args) - - return augmented_command - - -# -# Many commands require specifying a lexicon or user to operate on, so -# the requires_lexicon and requires_user wrappers replace @add_argument -# as well as automatically create the model for the object from the -# provided identifier. -# - - -LEXICON_ARGS = ['--lexicon'] -LEXICON_KWARGS = { - 'metavar': 'LEXICON', - 'dest': 'lexicon', - 'help': 'Specify a user to operate on'} - - -def requires_lexicon(command): - @wraps(command) - def augmented_command(cmd_args): - # Add lexicon argument in parser pass - if isinstance(cmd_args, ArgumentParser): - cmd_args.add_argument(*LEXICON_ARGS, **LEXICON_KWARGS) - # If there are more command wrappers, pass through to them - if command.__dict__.get('wrapper', False): - command(cmd_args) - # Parser pass doesn't return a value - return None - - # Verify lexicon argument in execute pass - val = getattr(cmd_args, 'lexicon', None) - if not val: - logger.error("Missing --lexicon argument") - return -1 - try: - model_factory = cmd_args.model_factory - cmd_args.lexicon = model_factory.lexicon(val) - except Exception: - ex_type, value, tb = exc_info() - logger.error( - f'Loading lexicon "{val}" failed with ' - f'{ex_type.__name__}: {value}') - return -1 - return command(cmd_args) - - augmented_command.__dict__['wrapper'] = True - return augmented_command - - -USER_ARGS = ['--user'] -USER_KWARGS = { - 'metavar': 'USER', - 'dest': 'user', - 'help': 'Specify a user to operate on'} - - -def requires_user(command): - @wraps(command) - def augmented_command(cmd_args): - # Add user argument in parser pass - if isinstance(cmd_args, ArgumentParser): - cmd_args.add_argument(*USER_ARGS, **USER_KWARGS) - # If there are more command wrappers, pass through to them - if command.__dict__.get('wrapper', False): - command(cmd_args) - # Parser pass doesn't return a value - return None - - # Verify user argument in execute pass - val = getattr(cmd_args, "user", None) - if not val: - logger.error("Missing --user argument") - return -1 - try: - model_factory = cmd_args.model_factory - cmd_args.user = model_factory.user(val) - except Exception: - ex_type, value, tb = exc_info() - logger.error( - f'Loading user "{val}" failed with ' - f'{ex_type.__name__}: {value}') - return -1 - return command(cmd_args) - - augmented_command.__dict__['wrapper'] = True - return augmented_command - - -# Wrapper for aliasing commands -def alias(cmd_alias): - """Adds an alias to the function dictionary""" - def aliaser(command): - aliases = command.__dict__.get('aliases', []) - aliases.append(cmd_alias) - command.__dict__['aliases'] = aliases - return command - return aliaser - - -# Helpers for common command tasks - -CONFIG_GET_ROOT_VALUE = object() - - -def config_get(cfg, pathspec): - """ - Performs config --get for a given config - - cfg is from a `with json_ro` context - path is the full pathspec, unsplit - """ - import json - - if pathspec is CONFIG_GET_ROOT_VALUE: - path = [] - else: - path = pathspec.split(".") - for spec in path: - if spec not in cfg: - logger.error("Path not found: {}".format(pathspec)) - return -1 - cfg = cfg.get(spec) - print(json.dumps(cfg, indent=2)) - return 0 - - -def config_set(obj_id, cfg, set_tuple): - """ - Performs config --set for a given config - - config is from a "with json_rw" context - set_tuple is a tuple of the pathspec and the value - """ - import json - pathspec, value = set_tuple - if not pathspec: - logger.error("Path must be non-empty") - path = pathspec.split('.') - try: - value = json.loads(value) - except JSONDecodeError: - pass # Leave value as string - for spec in path[:-1]: - if spec not in cfg: - logger.error("Path not found") - return -1 - cfg = cfg.get(spec) - key = path[-1] - if key not in cfg: - logger.error("Path not found") - return -1 - old_value = cfg[key] - cfg[key] = value - logger.info("{}.{}: {} -> {}".format(obj_id, pathspec, old_value, value)) - return 0 + return argument_adder