From db83c99b9512784ed91a5cf77763b380c373b50f Mon Sep 17 00:00:00 2001 From: Tim Van Baak Date: Wed, 15 Jan 2020 18:11:09 -0800 Subject: [PATCH] Split cli package into topical files --- amanuensis/cli/__init__.py | 204 +------------------------------------ amanuensis/cli/helpers.py | 30 ++++++ amanuensis/cli/lexicon.py | 1 + amanuensis/cli/server.py | 87 ++++++++++++++++ amanuensis/cli/user.py | 85 ++++++++++++++++ 5 files changed, 205 insertions(+), 202 deletions(-) create mode 100644 amanuensis/cli/helpers.py create mode 100644 amanuensis/cli/lexicon.py create mode 100644 amanuensis/cli/server.py create mode 100644 amanuensis/cli/user.py diff --git a/amanuensis/cli/__init__.py b/amanuensis/cli/__init__.py index a18a614..e310560 100644 --- a/amanuensis/cli/__init__.py +++ b/amanuensis/cli/__init__.py @@ -1,7 +1,3 @@ -# Standard library imports -from argparse import ArgumentParser as AP -from functools import wraps - # # The cli module must not import other parts of the application at the module # level. This is because most other modules depend on the config module. The @@ -12,201 +8,5 @@ from functools import wraps # run after commandline parsing has already occurred. # -# -# These function wrappers are used to make the command_* methods accept an -# ArgumentParser as a parameter, which it then configures with the given -# argument and returns. This way, we can configure each command's subparser -# in this module without having to write a separate function to configure it. -# -def add_argument(*args, **kwargs): - """Passes the given args and kwargs to subparser.add_argument""" - def argument_adder(command): - second_layer = command.__dict__.get('wrapper', False) - @wraps(command) - def augmented_command(cmd_args): - if type(cmd_args) is AP: - cmd_args.add_argument(*args, **kwargs) - if type(cmd_args) is not AP or second_layer: - command(cmd_args) - augmented_command.__dict__['wrapper'] = True - return augmented_command - return argument_adder - -def no_argument(command): - """Noops for subparsers""" - @wraps(command) - def augmented_command(cmd_args): - if type(cmd_args) is not AP: - command(cmd_args) - return augmented_command - - -@add_argument("--update", action="store_true", help="Refresh an existing config directory") -def command_init(args): - """Initialize a config directory at the directory given by --config-dir""" - from collections import OrderedDict - import fcntl - import json - import os - import pkg_resources - - cfd = args.config_dir - # Create the directory if it doesn't exist. - if not os.path.isdir(cfd): - os.mkdir(cfd) - # The directory should be empty if we're not updating an existing one. - if len(os.listdir(cfd)) > 0 and not args.update: - print("Directory {} is not empty".format(cfd)) - return -1 - - # Update or create global config. - def_cfg = pkg_resources.resource_stream(__name__, "resources/default_config.json") - if args.update and os.path.isfile(os.path.join(cfd, "config.json")): - with open(os.path.join(cfd, "config.json"), 'r+', encoding='utf8') as cfg_file: - fcntl.lockf(cfg_file, fcntl.LOCK_EX) - old_cfg = json.load(cfg_file, object_pairs_hook=OrderedDict) - new_cfg = json.load(def_cfg, object_pairs_hook=OrderedDict) - merged = {} - for key in new_cfg: - merged[key] = old_cfg[key] if key in old_cfg else new_cfg[key] - if key not in old_cfg: - print("Added key '{}' to config".format(key)) - for key in old_cfg: - if key not in new_cfg: - print("Config contains unknown key '{}'".format(key)) - merged[key] = old_cfg[key] - cfg_file.seek(0) - json.dump(merged, cfg_file, allow_nan=False, indent='\t') - cfg_file.truncate() - fcntl.lockf(cfg_file, fcntl.LOCK_UN) - else: - with open(os.path.join(cfd, "config.json"), 'wb') as f: - f.write(def_cfg.read()) - # Ensure pidfile exists. - if not os.path.isfile(os.path.join(cfd, "pid")): - with open(os.path.join(cfd, "pid"), 'w') as f: - f.write(str(os.getpid())) - # Ensure subdirs exist. - if not os.path.isdir(os.path.join(cfd, "lexicon")): - os.mkdir(os.path.join(cfd, "lexicon")) - if not os.path.isdir(os.path.join(cfd, "user")): - os.mkdir(os.path.join(cfd, "user")) - if not os.path.isfile(os.path.join(cfd, 'user', 'index.json')): - with open(os.path.join(cfd, 'user', 'index.json'), 'w') as f: - json.dump({}, f) - -@no_argument -def command_generate_secret(args): - """Generate a secret key for Flask""" - import os - - import config - - secret_key = os.urandom(32) - with config.json_rw("config.json") as cfg: - cfg['secret_key'] = secret_key.hex() - config.logger.info("Regenerated Flask secret key") - -@add_argument("-a", "--address", default="127.0.0.1") -@add_argument("-p", "--port", default="5000") -def command_run(args): - """Runs the default Flask development server""" - import server - import config - - if config.get("secret_key") is None: - config.logger.error("Can't run server without a secret_key. Run generate-secret first") - return -1 - server.app.run(host=args.address, port=args.port) - -@add_argument("--username", help="User's login handle") -@add_argument("--displayname", help="User's publicly displayed name") -@add_argument("--email", help="User's email") -def command_user_add(args): - """Creates a user""" - import json - - import user - import config - - # Verify or query parameters - if not args.username: - args.username = input("username: ").strip() - if not user.valid_username(args.username): - config.logger.error("Invalid username: usernames may only contain alphanumeric characters, dashes, and underscores") - return -1 - if user.get_user_by_username(args.username) is not None: - config.logger.error("Invalid username: username is already taken") - return -1 - if not args.displayname: - args.displayname = args.username - if not args.email: - args.email = input("email: ").strip() - if not user.valid_email(args.email): - config.logger.error("Invalid email") - return -1 - # Create user - new_user, tmp_pw = user.create_user(args.username, args.displayname, args.email) - with config.json_ro(new_user.config) as js: - print(json.dumps(js, indent=2)) - print("Username: {}\nUser ID: {}\nPassword: {}".format(args.username, new_user.uid, tmp_pw)) - -@add_argument("--id", help="id of user to delete") -def command_user_delete(args): - import os - - import config - - user_path = config.prepend('user', args.uid) - if not os.path.isdir(user_path): - config.logger.error("No user with that id") - return -1 - else: - os.remove(user_path) - with config.json_rw('user', 'index.json') as j: - if args.uid in j: - del j[uid] - -@no_argument -def command_user_list(args): - """Lists users""" - import os - - import config - - user_dirs = os.listdir(config.prepend('user')) - users = [] - for uid in user_dirs: - if uid == "index.json": continue - with config.json_ro('user', uid, 'config.json') as user: - users.append(user) - users.sort(key=lambda u: u['username']) - for user in users: - print("{0} {1} ({2})".format(user['uid'], user['displayname'], user['username'])) - -@add_argument("--username", help="The user to change password for") -def command_user_passwd(args): - """Set a user's password""" - import getpass - import os - - import config - import user - - if not args.username: - args.username = input("Username: ") - uid = user.uid_from_username(args.username) - if uid is None: - config.logger.error("No user with username '{}'".format(args.username)) - return -1 - u = user.user_from_uid(uid) - pw = getpass.getpass("Password: ") - u.set_password(pw) - -@add_argument("--foo", action="store_true") -def command_dump(args): - """Dumps the global config or the config for the given lexicon""" - import json - import config - print(json.dumps(config.GLOBAL_CONFIG, indent=2)) - +from cli.server import * +from cli.user import * diff --git a/amanuensis/cli/helpers.py b/amanuensis/cli/helpers.py new file mode 100644 index 0000000..9cbac0f --- /dev/null +++ b/amanuensis/cli/helpers.py @@ -0,0 +1,30 @@ +# Standard library imports +from argparse import ArgumentParser as AP +from functools import wraps + +# These function wrappers allow us to use the same function for executing a +# command and for configuring it. This keeps command arg configuration close to +# where the command is defined and allows the main parser to use the same +# function to both set up and execute commands. + +def add_argument(*args, **kwargs): + """Passes the given args and kwargs to subparser.add_argument""" + def argument_adder(command): + second_layer = command.__dict__.get('wrapper', False) + @wraps(command) + def augmented_command(cmd_args): + if type(cmd_args) is AP: + cmd_args.add_argument(*args, **kwargs) + if type(cmd_args) is not AP or second_layer: + command(cmd_args) + augmented_command.__dict__['wrapper'] = True + return augmented_command + return argument_adder + +def no_argument(command): + """Noops for subparsers""" + @wraps(command) + def augmented_command(cmd_args): + if type(cmd_args) is not AP: + command(cmd_args) + return augmented_command diff --git a/amanuensis/cli/lexicon.py b/amanuensis/cli/lexicon.py new file mode 100644 index 0000000..11539f4 --- /dev/null +++ b/amanuensis/cli/lexicon.py @@ -0,0 +1 @@ +from cli.helpers import add_argument, no_argument diff --git a/amanuensis/cli/server.py b/amanuensis/cli/server.py new file mode 100644 index 0000000..6311012 --- /dev/null +++ b/amanuensis/cli/server.py @@ -0,0 +1,87 @@ +from cli.helpers import add_argument, no_argument + +@add_argument("--update", action="store_true", help="Refresh an existing config directory") +def command_init(args): + """Initialize a config directory at the directory given by --config-dir""" + from collections import OrderedDict + import fcntl + import json + import os + import pkg_resources + + cfd = args.config_dir + # Create the directory if it doesn't exist. + if not os.path.isdir(cfd): + os.mkdir(cfd) + # The directory should be empty if we're not updating an existing one. + if len(os.listdir(cfd)) > 0 and not args.update: + print("Directory {} is not empty".format(cfd)) + return -1 + + # Update or create global config. + def_cfg = pkg_resources.resource_stream(__name__, "resources/default_config.json") + if args.update and os.path.isfile(os.path.join(cfd, "config.json")): + with open(os.path.join(cfd, "config.json"), 'r+', encoding='utf8') as cfg_file: + fcntl.lockf(cfg_file, fcntl.LOCK_EX) + old_cfg = json.load(cfg_file, object_pairs_hook=OrderedDict) + new_cfg = json.load(def_cfg, object_pairs_hook=OrderedDict) + merged = {} + for key in new_cfg: + merged[key] = old_cfg[key] if key in old_cfg else new_cfg[key] + if key not in old_cfg: + print("Added key '{}' to config".format(key)) + for key in old_cfg: + if key not in new_cfg: + print("Config contains unknown key '{}'".format(key)) + merged[key] = old_cfg[key] + cfg_file.seek(0) + json.dump(merged, cfg_file, allow_nan=False, indent='\t') + cfg_file.truncate() + fcntl.lockf(cfg_file, fcntl.LOCK_UN) + else: + with open(os.path.join(cfd, "config.json"), 'wb') as f: + f.write(def_cfg.read()) + # Ensure pidfile exists. + if not os.path.isfile(os.path.join(cfd, "pid")): + with open(os.path.join(cfd, "pid"), 'w') as f: + f.write(str(os.getpid())) + # Ensure subdirs exist. + if not os.path.isdir(os.path.join(cfd, "lexicon")): + os.mkdir(os.path.join(cfd, "lexicon")) + if not os.path.isdir(os.path.join(cfd, "user")): + os.mkdir(os.path.join(cfd, "user")) + if not os.path.isfile(os.path.join(cfd, 'user', 'index.json')): + with open(os.path.join(cfd, 'user', 'index.json'), 'w') as f: + json.dump({}, f) + +@no_argument +def command_generate_secret(args): + """Generate a secret key for Flask""" + import os + + import config + + secret_key = os.urandom(32) + with config.json_rw("config.json") as cfg: + cfg['secret_key'] = secret_key.hex() + config.logger.info("Regenerated Flask secret key") + +@add_argument("-a", "--address", default="127.0.0.1") +@add_argument("-p", "--port", default="5000") +def command_run(args): + """Runs the default Flask development server""" + import server + import config + + if config.get("secret_key") is None: + config.logger.error("Can't run server without a secret_key. Run generate-secret first") + return -1 + server.app.run(host=args.address, port=args.port) + +@add_argument("--foo", action="store_true") +def command_dump(args): + """Dumps the global config or the config for the given lexicon""" + import json + import config + print(json.dumps(config.GLOBAL_CONFIG, indent=2)) + diff --git a/amanuensis/cli/user.py b/amanuensis/cli/user.py new file mode 100644 index 0000000..dacb46b --- /dev/null +++ b/amanuensis/cli/user.py @@ -0,0 +1,85 @@ +from cli.helpers import add_argument, no_argument + +@add_argument("--username", help="User's login handle") +@add_argument("--displayname", help="User's publicly displayed name") +@add_argument("--email", help="User's email") +def command_user_add(args): + """Creates a user""" + import json + + import user + import config + + # Verify or query parameters + if not args.username: + args.username = input("username: ").strip() + if not user.valid_username(args.username): + config.logger.error("Invalid username: usernames may only contain alphanumeric characters, dashes, and underscores") + return -1 + if user.get_user_by_username(args.username) is not None: + config.logger.error("Invalid username: username is already taken") + return -1 + if not args.displayname: + args.displayname = args.username + if not args.email: + args.email = input("email: ").strip() + if not user.valid_email(args.email): + config.logger.error("Invalid email") + return -1 + # Create user + new_user, tmp_pw = user.create_user(args.username, args.displayname, args.email) + with config.json_ro(new_user.config) as js: + print(json.dumps(js, indent=2)) + print("Username: {}\nUser ID: {}\nPassword: {}".format(args.username, new_user.uid, tmp_pw)) + +@add_argument("--id", help="id of user to delete") +def command_user_delete(args): + import os + + import config + + user_path = config.prepend('user', args.uid) + if not os.path.isdir(user_path): + config.logger.error("No user with that id") + return -1 + else: + os.remove(user_path) + with config.json_rw('user', 'index.json') as j: + if args.uid in j: + del j[uid] + +@no_argument +def command_user_list(args): + """Lists users""" + import os + + import config + + user_dirs = os.listdir(config.prepend('user')) + users = [] + for uid in user_dirs: + if uid == "index.json": continue + with config.json_ro('user', uid, 'config.json') as user: + users.append(user) + users.sort(key=lambda u: u['username']) + for user in users: + print("{0} {1} ({2})".format(user['uid'], user['displayname'], user['username'])) + +@add_argument("--username", help="The user to change password for") +def command_user_passwd(args): + """Set a user's password""" + import getpass + import os + + import config + import user + + if not args.username: + args.username = input("Username: ") + uid = user.uid_from_username(args.username) + if uid is None: + config.logger.error("No user with username '{}'".format(args.username)) + return -1 + u = user.user_from_uid(uid) + pw = getpass.getpass("Password: ") + u.set_password(pw)