diff --git a/amanuensis/backend/lexicon.py b/amanuensis/backend/lexicon.py index 18bbe84..726b360 100644 --- a/amanuensis/backend/lexicon.py +++ b/amanuensis/backend/lexicon.py @@ -3,6 +3,7 @@ Lexicon query interface """ import re +from typing import Sequence from sqlalchemy import select, func @@ -52,3 +53,8 @@ def create( db.session.add(new_lexicon) db.session.commit() return new_lexicon + + +def get_all_lexicons(db: DbContext) -> Sequence[Lexicon]: + """Get all lexicons.""" + return db(select(Lexicon)).scalars() diff --git a/amanuensis/backend/user.py b/amanuensis/backend/user.py index 5411079..4ff2264 100644 --- a/amanuensis/backend/user.py +++ b/amanuensis/backend/user.py @@ -3,7 +3,7 @@ User query interface """ import re -import uuid +from typing import Sequence from sqlalchemy import select, func @@ -67,3 +67,8 @@ def create( db.session.add(new_user) db.session.commit() return new_user + + +def get_all_users(db: DbContext) -> Sequence[User]: + """Get all users.""" + return db(select(User)).scalars() diff --git a/amanuensis/cli/__init__.py b/amanuensis/cli/__init__.py index 6723b18..7f50868 100644 --- a/amanuensis/cli/__init__.py +++ b/amanuensis/cli/__init__.py @@ -1,72 +1,98 @@ -# -# The cli module must not import other parts of the application at the module -# level. This is because most other modules depend on the config module. The -# config module may depend on __main__'s commandline parsing to locate config -# files, and __main__'s commandline parsing requires importing (but not -# executing) the functions in the cli module. Thus, cli functions must only -# import the config module inside the various command methods, which are only -# run after commandline parsing has already occurred. -# +from argparse import ArgumentParser +import logging +import logging.config + +import amanuensis.cli.admin +import amanuensis.cli.lexicon +import amanuensis.cli.user -def server_commands(commands={}): - if commands: - return commands - import amanuensis.cli.server - for name, func in vars(amanuensis.cli.server).items(): - if name.startswith("command_"): - name = name[8:].replace("_", "-") - commands[name] = func - return commands +LOGGING_CONFIG = { + "version": 1, + "formatters": { + "fmt_basic": { + "validate": True, + "format": "%(message)s", + }, + "fmt_detailed": { + "validate": True, + "format": "%(asctime)s %(levelname)s %(message)s", + }, + }, + "handlers": { + "hnd_stderr": { + "class": "logging.StreamHandler", + "level": "INFO", + "formatter": "fmt_basic", + }, + }, + "loggers": { + __name__: { + "level": "DEBUG", + "handlers": ["hnd_stderr"], + }, + }, +} -def lexicon_commands(commands={}): - if commands: - return commands - import amanuensis.cli.lexicon - for name, func in vars(amanuensis.cli.lexicon).items(): - if name.startswith("command_"): - name = name[8:].replace("_", "-") - commands["lexicon-" + name] = func - return commands +def add_subcommand(subparsers, module) -> None: + """Add a cli submodule's commands as a subparser.""" + # Get the command information from the module + command_name: str = getattr(module, "COMMAND_NAME") + command_help: str = getattr(module, "COMMAND_HELP") + if not command_name and command_help: + return + + # Add the subparser for the command and set a default action + command_parser: ArgumentParser = subparsers.add_parser( + command_name, help=command_help + ) + command_parser.set_defaults(func=lambda args: command_parser.print_usage()) + + # Add all subcommands in the command module + subcommands = command_parser.add_subparsers(metavar="SUBCOMMAND") + for name, obj in vars(module).items(): + if name.startswith("command_"): + # Hyphenate subcommand names + sc_name: str = name[8:].replace("_", "-") + # Only the first line of the subcommand function docstring is used + sc_help = ((obj.__doc__ or "").strip() or "\n").splitlines()[0] + + # Add the command and any arguments defined by its decorators + subcommand: ArgumentParser = subcommands.add_parser( + sc_name, help=sc_help, description=obj.__doc__ + ) + subcommand.set_defaults(func=obj) + for args, kwargs in obj.__dict__.get("add_argument", []): + subcommand.add_argument(*args, **kwargs) -def user_commands(commands={}): - if commands: - return commands - import amanuensis.cli.user - for name, func in vars(amanuensis.cli.user).items(): - if name.startswith("command_"): - name = name[8:].replace("_", "-") - commands["user-" + name] = func - return commands +def init_logger(args): + """Set up logging based on verbosity args""" + if args.verbose: + handler = LOGGING_CONFIG["handlers"]["hnd_stderr"] + handler["formatter"] = "fmt_detailed" + handler["level"] = "DEBUG" + logging.config.dictConfig(LOGGING_CONFIG) -def get_commands(): - return {**server_commands(), **lexicon_commands(), **user_commands()} +def main(): + """CLI entry point""" + # Set up the top-level parser + parser = ArgumentParser() + parser.set_defaults( + parser=parser, + func=lambda args: parser.print_usage(), + ) + parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") + # Add commands from cli submodules + subparsers = parser.add_subparsers(metavar="COMMAND") + add_subcommand(subparsers, amanuensis.cli.admin) + add_subcommand(subparsers, amanuensis.cli.lexicon) + add_subcommand(subparsers, amanuensis.cli.user) -def cmd_desc(func): - return ((func.__doc__ or "").strip() or '\n').splitlines()[0] - - -def describe_commands(): - longest = max(map(len, server_commands().keys())) - server_desc = "General commands:\n{}\n".format("\n".join([ - " {1:<{0}} : {2}".format(longest, name, cmd_desc(func)) - for name, func in server_commands().items() - ])) - - longest = max(map(len, lexicon_commands().keys())) - lexicon_desc = "Lexicon commands:\n{}\n".format("\n".join([ - " {1:<{0}} : {2}".format(longest, name, cmd_desc(func)) - for name, func in lexicon_commands().items() - ])) - - longest = max(map(len, user_commands().keys())) - user_desc = "User commands:\n{}\n".format("\n".join([ - " {1:<{0}} : {2}".format(longest, name, cmd_desc(func)) - for name, func in user_commands().items() - ])) - - return "\n".join([server_desc, lexicon_desc, user_desc]) + # Parse args and execute the desired action + args = parser.parse_args() + init_logger(args) + args.func(args) diff --git a/amanuensis/cli/admin.py b/amanuensis/cli/admin.py new file mode 100644 index 0000000..c7e7f30 --- /dev/null +++ b/amanuensis/cli/admin.py @@ -0,0 +1,60 @@ +import collections +import json +import logging +import os + +from amanuensis.db import DbContext + +from .helpers import add_argument + + +COMMAND_NAME = "admin" +COMMAND_HELP = "Interact with Amanuensis." + +LOG = logging.getLogger(__name__) + + +@add_argument( + "path", metavar="DB_PATH", help="Path to where the database should be created" +) +@add_argument("--force", "-f", action="store_true", help="Overwrite existing database") +@add_argument("--verbose", "-v", action="store_true", help="Enable db echo") +def command_init_db(args) -> int: + """ + Initialize the Amanuensis database. + """ + # Check if force is required + if not args.force and os.path.exists(args.path): + args.parser.error(f"{args.path} already exists and --force was not specified") + + # Initialize the database + db_uri = f"sqlite:///{os.path.abspath(args.path)}" + LOG.info(f"Creating database at {db_uri}") + db = DbContext(db_uri, debug=args.verbose) + db.create_all() + + LOG.info("Done") + return 0 + + +@add_argument("path", metavar="CONFIG_PATH", help="Path to the config file") +def command_secret_key(args) -> int: + """ + Generate a Flask secret key. + + The Flask server will not run unless a secret key has + been generated. + """ + # Load the json config + with open(args.path, mode="r", encoding="utf8") as f: + config = json.load(f, object_pairs_hook=collections.OrderedDict) + + # Set the secret key to a new random string + config["SECRET_KEY"] = os.urandom(32).hex() + + # Write the config back out + with open(args.path, mode="w", encoding="utf8") as f: + json.dump(config, f, indent=2) + + LOG.info("Regenerated Flask secret key") + return 0 diff --git a/amanuensis/cli/helpers.py b/amanuensis/cli/helpers.py index 1d2a136..6bf3c31 100644 --- a/amanuensis/cli/helpers.py +++ b/amanuensis/cli/helpers.py @@ -1,209 +1,20 @@ -# Standard library imports -from argparse import ArgumentParser -from functools import wraps -from json.decoder import JSONDecodeError -from logging import getLogger -from sys import exc_info - -logger = getLogger(__name__) - - -# -# The add_argument and no_argument function wrappers allow the same -# function to both configure a command and execute it. This keeps -# command argument configuration close to where the command is defined -# and reduces the number of things the main parser has to handle. -# +""" +Helpers for cli commands. +""" def add_argument(*args, **kwargs): - """Passes the given args and kwargs to subparser.add_argument""" + """Defines an argument to a cli command.""" - def argument_adder(command): - @wraps(command) - def augmented_command(cmd_args): - # Add this wrapper's command in the parser pass - if isinstance(cmd_args, ArgumentParser): - cmd_args.add_argument(*args, **kwargs) - # If there are more command wrappers, pass through to them - if command.__dict__.get('wrapper', False): - command(cmd_args) - # Parser pass doesn't return a value - return None + def argument_adder(command_func): + """Decorator function for storing parser args on the function.""" - # Pass through transparently in the execute pass - return command(cmd_args) + # Store the kw/args in the function dictionary + add_args = command_func.__dict__.get("add_argument", []) + add_args.append((args, kwargs)) + command_func.__dict__["add_argument"] = add_args - # Mark the command as wrapped so control passes through - augmented_command.__dict__['wrapper'] = True - return augmented_command + # Return the same function + return command_func - return argument_adder - - -def no_argument(command): - """Noops for subparsers""" - @wraps(command) - def augmented_command(cmd_args): - # Noop in the parser pass - if isinstance(cmd_args, ArgumentParser): - return None - # Pass through in the execute pass - return command(cmd_args) - - return augmented_command - - -# -# Many commands require specifying a lexicon or user to operate on, so -# the requires_lexicon and requires_user wrappers replace @add_argument -# as well as automatically create the model for the object from the -# provided identifier. -# - - -LEXICON_ARGS = ['--lexicon'] -LEXICON_KWARGS = { - 'metavar': 'LEXICON', - 'dest': 'lexicon', - 'help': 'Specify a user to operate on'} - - -def requires_lexicon(command): - @wraps(command) - def augmented_command(cmd_args): - # Add lexicon argument in parser pass - if isinstance(cmd_args, ArgumentParser): - cmd_args.add_argument(*LEXICON_ARGS, **LEXICON_KWARGS) - # If there are more command wrappers, pass through to them - if command.__dict__.get('wrapper', False): - command(cmd_args) - # Parser pass doesn't return a value - return None - - # Verify lexicon argument in execute pass - val = getattr(cmd_args, 'lexicon', None) - if not val: - logger.error("Missing --lexicon argument") - return -1 - try: - model_factory = cmd_args.model_factory - cmd_args.lexicon = model_factory.lexicon(val) - except Exception: - ex_type, value, tb = exc_info() - logger.error( - f'Loading lexicon "{val}" failed with ' - f'{ex_type.__name__}: {value}') - return -1 - return command(cmd_args) - - augmented_command.__dict__['wrapper'] = True - return augmented_command - - -USER_ARGS = ['--user'] -USER_KWARGS = { - 'metavar': 'USER', - 'dest': 'user', - 'help': 'Specify a user to operate on'} - - -def requires_user(command): - @wraps(command) - def augmented_command(cmd_args): - # Add user argument in parser pass - if isinstance(cmd_args, ArgumentParser): - cmd_args.add_argument(*USER_ARGS, **USER_KWARGS) - # If there are more command wrappers, pass through to them - if command.__dict__.get('wrapper', False): - command(cmd_args) - # Parser pass doesn't return a value - return None - - # Verify user argument in execute pass - val = getattr(cmd_args, "user", None) - if not val: - logger.error("Missing --user argument") - return -1 - try: - model_factory = cmd_args.model_factory - cmd_args.user = model_factory.user(val) - except Exception: - ex_type, value, tb = exc_info() - logger.error( - f'Loading user "{val}" failed with ' - f'{ex_type.__name__}: {value}') - return -1 - return command(cmd_args) - - augmented_command.__dict__['wrapper'] = True - return augmented_command - - -# Wrapper for aliasing commands -def alias(cmd_alias): - """Adds an alias to the function dictionary""" - def aliaser(command): - aliases = command.__dict__.get('aliases', []) - aliases.append(cmd_alias) - command.__dict__['aliases'] = aliases - return command - return aliaser - - -# Helpers for common command tasks - -CONFIG_GET_ROOT_VALUE = object() - - -def config_get(cfg, pathspec): - """ - Performs config --get for a given config - - cfg is from a `with json_ro` context - path is the full pathspec, unsplit - """ - import json - - if pathspec is CONFIG_GET_ROOT_VALUE: - path = [] - else: - path = pathspec.split(".") - for spec in path: - if spec not in cfg: - logger.error("Path not found: {}".format(pathspec)) - return -1 - cfg = cfg.get(spec) - print(json.dumps(cfg, indent=2)) - return 0 - - -def config_set(obj_id, cfg, set_tuple): - """ - Performs config --set for a given config - - config is from a "with json_rw" context - set_tuple is a tuple of the pathspec and the value - """ - import json - pathspec, value = set_tuple - if not pathspec: - logger.error("Path must be non-empty") - path = pathspec.split('.') - try: - value = json.loads(value) - except JSONDecodeError: - pass # Leave value as string - for spec in path[:-1]: - if spec not in cfg: - logger.error("Path not found") - return -1 - cfg = cfg.get(spec) - key = path[-1] - if key not in cfg: - logger.error("Path not found") - return -1 - old_value = cfg[key] - cfg[key] = value - logger.info("{}.{}: {} -> {}".format(obj_id, pathspec, old_value, value)) - return 0 + return argument_adder diff --git a/amanuensis/cli/lexicon.py b/amanuensis/cli/lexicon.py index 6bcc6b0..92fc7ab 100644 --- a/amanuensis/cli/lexicon.py +++ b/amanuensis/cli/lexicon.py @@ -1,324 +1,30 @@ -# Standard library imports import logging -# Module imports -from amanuensis.config import RootConfigDirectoryContext -from amanuensis.models import LexiconModel, UserModel - -from .helpers import ( - add_argument, no_argument, requires_lexicon, requires_user, alias, - config_get, config_set, CONFIG_GET_ROOT_VALUE) - -logger = logging.getLogger(__name__) - -# -# CRUD commands -# +from .helpers import add_argument + + +COMMAND_NAME = "lexicon" +COMMAND_HELP = "Interact with lexicons." + +LOG = logging.getLogger(__name__) -@alias('lc') -@add_argument("--name", required=True, help="The name of the new lexicon") -@requires_user -@add_argument("--prompt", help="The lexicon's prompt") def command_create(args): - """ - Create a lexicon - - The specified user will be the editor. A newly created created lexicon is - not open for joining and requires additional configuration before it is - playable. The editor should ensure that all settings are as desired before - opening the lexicon for player joins. - """ - # Module imports - from amanuensis.lexicon import valid_name, create_lexicon - - root: RootConfigDirectoryContext = args.root - - # Verify arguments - if not valid_name(args.name): - logger.error(f'Lexicon name contains illegal characters: "{args.name}"') - return -1 - with root.lexicon.read_index() as index: - if args.name in index.keys(): - logger.error(f'A lexicon with name "{args.name}" already exists') - return -1 - - # Perform command - create_lexicon(root, args.name, args.user) - - # Output already logged by create_lexicon - return 0 + """ + Create a lexicon. + """ + raise NotImplementedError() -@alias('ld') -@requires_lexicon -@add_argument("--purge", action="store_true", help="Delete the lexicon's data") def command_delete(args): - """ - Delete a lexicon and optionally its data - """ - raise NotImplementedError() - # # Module imports - # from amanuensis.config import logger - # from amanuensis.lexicon.manage import delete_lexicon - - # # Perform command - # delete_lexicon(args.lexicon, args.purge) - - # # Output - # logger.info('Deleted lexicon "{}"'.format(args.lexicon.name)) - # return 0 + """ + Delete a lexicon. + """ + raise NotImplementedError() -@alias('ll') -@no_argument def command_list(args): - """ - List all lexicons and their statuses - """ - raise NotImplementedError() - # # Module imports - # from amanuensis.lexicon.manage import get_all_lexicons - - # # Execute command - # lexicons = get_all_lexicons() - - # # Output - # statuses = [] - # for lex in lexicons: - # statuses.append("{0.lid} {0.name} ({1})".format(lex, lex.status())) - # for s in statuses: - # print(s) - # return 0 - - -@alias('ln') -@requires_lexicon -@add_argument("--get", - metavar="PATHSPEC", - dest="get", - nargs="?", - const=CONFIG_GET_ROOT_VALUE, - help="Get the value of a config key") -@add_argument("--set", - metavar=("PATHSPEC", "VALUE"), - dest="set", - nargs=2, - help="Set the value of a config key") -def command_config(args): - """ - Interact with a lexicon's config - """ - lexicon: LexiconModel = args.lexicon - - # Verify arguments - if args.get and args.set: - logger.error("Specify one of --get and --set") - return -1 - - # Execute command - if args.get: - config_get(lexicon.cfg, args.get) - - if args.set: - with lexicon.ctx.edit_config() as cfg: - config_set(lexicon.lid, cfg, args.set) - - # config_* functions handle output - return 0 - -# -# Player/character commands -# - - -@alias('lpa') -@requires_lexicon -@requires_user -def command_player_add(args): - """ - Add a player to a lexicon - """ - lexicon: LexiconModel = args.lexicon - user: UserModel = args.user - - # Module imports - from amanuensis.lexicon import add_player_to_lexicon - - # Verify arguments - if user.uid in lexicon.cfg.join.joined: - logger.error(f'"{user.cfg.username}" is already a player ' - f'in "{lexicon.cfg.name}"') - return -1 - - # Perform command - add_player_to_lexicon(user, lexicon) - - # Output - logger.info(f'Added user "{user.cfg.username}" to ' - f'lexicon "{lexicon.cfg.name}"') - return 0 - - -@alias('lpr') -@requires_lexicon -@requires_user -def command_player_remove(args): - """ - Remove a player from a lexicon - - Removing a player dissociates them from any characters - they control but does not delete any character data. - """ - raise NotImplementedError() - # # Module imports - # from amanuensis.lexicon.manage import remove_player - - # # Verify arguments - # if not args.user.in_lexicon(args.lexicon): - # logger.error('"{0.username}" is not a player in lexicon "{1.name}"' - # ''.format(args.user, args.lexicon)) - # return -1 - # if args.user.id == args.lexicon.editor: - # logger.error("Can't remove the editor of a lexicon") - # return -1 - - # # Perform command - # remove_player(args.lexicon, args.user) - - # # Output - # logger.info('Removed "{0.username}" from lexicon "{1.name}"'.format( - # args.user, args.lexicon)) - # return 0 - - -@alias('lpl') -@requires_lexicon -def command_player_list(args): - """ - List all players in a lexicon - """ - raise NotImplementedError() - # import json - # # Module imports - # from amanuensis.user import UserModel - - # # Perform command - # players = list(map( - # lambda uid: UserModel.by(uid=uid).username, - # args.lexicon.join.joined)) - - # # Output - # print(json.dumps(players, indent=2)) - # return 0 - - -@alias('lcc') -@requires_lexicon -@requires_user -@add_argument("--charname", required=True, help="The character's name") -def command_char_create(args): - """ - Create a character for a lexicon - - The specified player will be set as the character's player. - """ - lexicon: LexiconModel = args.lexicon - user: UserModel = args.user - - # Module imports - from amanuensis.lexicon import create_character_in_lexicon - - # Verify arguments - if user.uid not in lexicon.cfg.join.joined: - logger.error('"{0.username}" is not a player in lexicon "{1.name}"' - ''.format(user.cfg, lexicon.cfg)) - return -1 - - # Perform command - create_character_in_lexicon(user, lexicon, args.charname) - - # Output - logger.info(f'Created character "{args.charname}" for "{user.cfg.username}"' - f' in "{lexicon.cfg.name}"') - return 0 - - -@alias('lcd') -@requires_lexicon -@add_argument("--charname", required=True, help="The character's name") -def command_char_delete(args): - """ - Delete a character from a lexicon - - Deleting a character dissociates them from any content - they have contributed rather than deleting it. - """ - raise NotImplementedError() - # # Module imports - # from amanuensis.lexicon import LexiconModel - # from amanuensis.lexicon.manage import delete_character - - # # Verify arguments - # lex = LexiconModel.by(name=args.lexicon) - # if lex is None: - # logger.error("Could not find lexicon '{}'".format(args.lexicon)) - # return -1 - - # # Internal call - # delete_character(lex, args.charname) - # return 0 - - -@alias('lcl') -@requires_lexicon -def command_char_list(args): - """ - List all characters in a lexicon - """ - raise NotImplementedError() - # import json - # # Module imports - # from amanuensis.lexicon import LexiconModel - - # # Verify arguments - # lex = LexiconModel.by(name=args.lexicon) - # if lex is None: - # logger.error("Could not find lexicon '{}'".format(args.lexicon)) - # return -1 - - # # Internal call - # print(json.dumps(lex.character, indent=2)) - # return 0 - -# -# Procedural commands -# - - -@alias('lpt') -@requires_lexicon -@add_argument("--as-deadline", - action="store_true", - help="Notifies players of the publish result") -@add_argument("--force", - action="store_true", - help="Publish all approved articles, regardless of other checks") -def command_publish_turn(args): - """ - Publishes the current turn of a lexicon - - The --as-deadline flag is intended to be used only by the scheduled publish - attempts controlled by the publish.deadlines setting. - - The --force flag bypasses the publish.quorum and publish.block_on_ready - settings. - """ - # Module imports - from amanuensis.lexicon import attempt_publish - - # Internal call - result = attempt_publish(args.lexicon) - - if not result: - logger.error('Publish failed, check lexicon log') + """ + List all lexicons and their statuses. + """ + raise NotImplementedError() diff --git a/amanuensis/cli/server.py b/amanuensis/cli/server.py deleted file mode 100644 index d337f4f..0000000 --- a/amanuensis/cli/server.py +++ /dev/null @@ -1,120 +0,0 @@ -import logging -import os - -from amanuensis.config import RootConfigDirectoryContext - -from .helpers import ( - add_argument, - no_argument, - alias, - config_get, - config_set, - CONFIG_GET_ROOT_VALUE) - -logger = logging.getLogger(__name__) - - -@alias('i') -@add_argument("--refresh", - action="store_true", - help="Refresh an existing config directory") -def command_init(args): - """ - Initialize a config directory at --config-dir - - A clean config directory will contain a config.json, a - lexicon config directory, and a user config directory. - - Refreshing an existing directory will add keys to the global config that - are present in the default configs. Users and lexicons that are missing - from the indexes will be deleted, and stale index entries will be removed. - """ - # Module imports - from amanuensis.config.init import create_config_dir - - # Verify arguments - if args.refresh and not os.path.isdir(args.config_dir): - print("Error: couldn't find directory '{}'".format(args.config_dir)) - - # Internal call - create_config_dir(args.config_dir, args.refresh) - logger.info(f'Initialized config dir at {args.config_dir}') - return 0 - - -@alias('gs') -@no_argument -def command_generate_secret(args): - """ - Generate a Flask secret key - - The Flask server will not run unless a secret key has - been generated. - """ - root: RootConfigDirectoryContext = args.root - secret_key: bytes = os.urandom(32) - with root.edit_config() as cfg: - cfg.secret_key = secret_key.hex() - logger.info("Regenerated Flask secret key") - return 0 - - -@alias('r') -@add_argument("-a", "--address", default="127.0.0.1") -@add_argument("-p", "--port", default="5000") -@add_argument("--debug", action="store_true") -def command_run(args): - """ - Run the default Flask server - - The default Flask server is not secure, and should - only be used for development. - """ - from amanuensis.server import get_app - - root: RootConfigDirectoryContext = args.root - - with root.read_config() as cfg: - if cfg.secret_key is None: - logger.error("Can't run server without a secret_key. " - "Run generate-secet first.") - return -1 - - get_app(root).run(host=args.address, port=args.port, debug=args.debug) - return 0 - - -@alias('n') -@add_argument("--get", - metavar="PATHSPEC", - dest="get", - nargs="?", - const=CONFIG_GET_ROOT_VALUE, - help="Get the value of a config key") -@add_argument("--set", - metavar=("PATHSPEC", "VALUE"), - dest="set", - nargs=2, - help="Set the value of a config key") -def command_config(args): - """ - Interact with the global config - - PATHSPEC is a path into the config object formatted as - a dot-separated sequence of keys. - """ - root: RootConfigDirectoryContext = args.root - - if args.get and args.set: - logger.error("Specify one of --get and --set") - return -1 - - if args.get: - with root.read_config() as cfg: - config_get(cfg, args.get) - - if args.set: - with root.edit_config() as cfg: - config_set("config", cfg, args.set) - - return 0 diff --git a/amanuensis/cli/user.py b/amanuensis/cli/user.py index 0f12486..91d16ce 100644 --- a/amanuensis/cli/user.py +++ b/amanuensis/cli/user.py @@ -1,158 +1,37 @@ -# Standard library imports -import getpass import logging -# import shutil -# Module imports -from amanuensis.models import UserModel - -from .helpers import ( - add_argument, - no_argument, - requires_user, - alias, - config_get, - config_set, - CONFIG_GET_ROOT_VALUE) - -logger = logging.getLogger(__name__) +from .helpers import add_argument + + +COMMAND_NAME = "user" +COMMAND_HELP = "Interact with users." + +LOG = logging.getLogger(__name__) -@alias('uc') -@add_argument("--username", required=True, help="Name of user to create") -@add_argument("--email", help="User's email") -@add_argument("--displayname", help="User's publicly displayed name") def command_create(args): - """ - Create a user - """ - # Module imports - from amanuensis.user import ( - valid_username, valid_email, create_user) - - # Verify arguments - if not valid_username(args.username): - logger.error("Invalid username: usernames may only contain alphanumer" - "ic characters, dashes, and underscores") - return -1 - if not args.displayname: - args.displayname = args.username - if args.email and not valid_email(args.email): - logger.error("Invalid email") - return -1 - try: - existing_user = args.model_factory.user(args.username) - if existing_user is not None: - logger.error("Invalid username: username is already taken") - return -1 - except Exception: - pass # User doesn't already exist, good to go - - # Perform command - new_user, tmp_pw = create_user( - args.root, - args.model_factory, - args.username, - args.displayname, - args.email) - - # Output - print(tmp_pw) - return 0 + """ + Create a user. + """ + raise NotImplementedError() -@alias('ud') -@requires_user def command_delete(args): - """ - Delete a user - """ - raise NotImplementedError() - # # Module imports - # from amanuensis.config import logger, prepend, json_rw - - # # Perform command - # user_path = prepend('user', args.user.id) - # shutil.rmtree(user_path) - # with json_rw('user', 'index.json') as index: - # del index[args.user.username] - - # # TODO resolve user id references in all games - - # # Output - # logger.info("Deleted user {0.username} ({0.id})".format(args.user)) - # return 0 + """ + Delete a user. + """ + raise NotImplementedError() -@alias('ul') -@no_argument def command_list(args): - """List all users""" - raise NotImplementedError() - # # Module imports - # from amanuensis.config import prepend, json_ro - # from amanuensis.user import UserModel - - # # Perform command - # users = [] - # with json_ro('user', 'index.json') as index: - # for username, uid in index.items(): - # users.append(UserModel.by(uid=uid)) - - # # Output - # users.sort(key=lambda u: u.username) - # for user in users: - # print("{0.id} {0.displayname} ({0.username})".format(user)) - # return 0 + """ + List all users. + """ + raise NotImplementedError() -@alias('un') -@requires_user -@add_argument( - "--get", metavar="PATHSPEC", dest="get", - nargs="?", const=CONFIG_GET_ROOT_VALUE, help="Get the value of a config key") -@add_argument( - "--set", metavar=("PATHSPEC", "VALUE"), dest="set", - nargs=2, help="Set the value of a config key") -def command_config(args): - """ - Interact with a user's config - """ - user: UserModel = args.user - - # Verify arguments - if args.get and args.set: - logger.error("Specify one of --get and --set") - return -1 - - # Perform command - if args.get: - config_get(user.cfg, args.get) - - if args.set: - with user.ctx.edit_config() as cfg: - config_set(user.uid, cfg, args.set) - - # Output - return 0 - - -@alias('up') -@requires_user -@add_argument("--password", help="The password to set. Used for scripting; " - "not recommended for general use") def command_passwd(args): - """ - Set a user's password - """ - user: UserModel = args.user - - # Verify arguments - password: str = args.password or getpass.getpass("Password: ") - - # Perform command - user.set_password(password) - - # Output - logger.info('Updated password for {}'.format(user.cfg.username)) - return 0 + """ + Set a user's password. + """ + raise NotImplementedError() diff --git a/amanuensis/config.py b/amanuensis/config.py new file mode 100644 index 0000000..59d9c37 --- /dev/null +++ b/amanuensis/config.py @@ -0,0 +1,46 @@ +from argparse import ArgumentParser +from typing import Optional +import os + + +class AmanuensisConfig: + """Base config type. Defines config keys for subclasses to override.""" + + # If CONFIG_FILE is defined, the config file it points to may override + # config values defined on the config object itself. + CONFIG_FILE: Optional[str] = None + STATIC_ROOT: Optional[str] = "../resources" + SECRET_KEY: Optional[str] = "secret" + DATABASE_URI: Optional[str] = "sqlite:///:memory:" + TESTING: bool = False + + +class EnvironmentConfig(AmanuensisConfig): + """Loads config values from environment variables.""" + + CONFIG_FILE = os.environ.get("AMANUENSIS_CONFIG_FILE", AmanuensisConfig.CONFIG_FILE) + STATIC_ROOT = os.environ.get("AMANUENSIS_STATIC_ROOT", AmanuensisConfig.STATIC_ROOT) + SECRET_KEY = os.environ.get("AMANUENSIS_SECRET_KEY", AmanuensisConfig.SECRET_KEY) + DATABASE_URI = os.environ.get( + "AMANUENSIS_DATABASE_URI", AmanuensisConfig.DATABASE_URI + ) + TESTING = os.environ.get("AMANUENSIS_TESTING", "").lower() in ("true", "1") + + +class CommandLineConfig(AmanuensisConfig): + """Loads config values from command line arguments.""" + + def __init__(self) -> None: + parser = ArgumentParser() + parser.add_argument("--config-file", default=AmanuensisConfig.CONFIG_FILE) + parser.add_argument("--static-root", default=AmanuensisConfig.STATIC_ROOT) + parser.add_argument("--secret-key", default=AmanuensisConfig.SECRET_KEY) + parser.add_argument("--database-uri", default=AmanuensisConfig.DATABASE_URI) + parser.add_argument("--debug", action="store_true") + args = parser.parse_args() + + self.CONFIG_FILE = args.config_file + self.STATIC_ROOT = args.static_root + self.SECRET_KEY = args.secret_key + self.DATABASE_URI = args.database_uri + self.TESTING = args.debug diff --git a/amanuensis/config/__init__.py b/amanuensis/config/__init__.py deleted file mode 100644 index e202d47..0000000 --- a/amanuensis/config/__init__.py +++ /dev/null @@ -1,23 +0,0 @@ -# Module imports -from .dict import AttrOrderedDict, ReadOnlyOrderedDict -from .directory import ( - RootConfigDirectoryContext, - UserConfigDirectoryContext, - LexiconConfigDirectoryContext, - is_guid) - -# Environment variable name constants -ENV_SECRET_KEY = "AMANUENSIS_SECRET_KEY" -ENV_CONFIG_DIR = "AMANUENSIS_CONFIG_DIR" -ENV_LOG_FILE = "AMANUENSIS_LOG_FILE" -ENV_LOG_FILE_SIZE = "AMANUENSIS_LOG_FILE_SIZE" -ENV_LOG_FILE_NUM = "AMANUENSIS_LOG_FILE_NUM" - -__all__ = [ - AttrOrderedDict.__name__, - ReadOnlyOrderedDict.__name__, - RootConfigDirectoryContext.__name__, - UserConfigDirectoryContext.__name__, - LexiconConfigDirectoryContext.__name__, - is_guid.__name__, -] diff --git a/amanuensis/config/context.py b/amanuensis/config/context.py deleted file mode 100644 index 19ee588..0000000 --- a/amanuensis/config/context.py +++ /dev/null @@ -1,82 +0,0 @@ -""" -`with` context managers for mediating config file access. -""" -# Standard library imports -import fcntl -import json - -# Application imports -from .dict import AttrOrderedDict, ReadOnlyOrderedDict - - -class open_lock(): - """A context manager that opens a file with the specified file lock""" - def __init__(self, path, mode, lock_type): - self.fd = open(path, mode, encoding='utf8') - fcntl.lockf(self.fd, lock_type) - - def __enter__(self): - return self.fd - - def __exit__(self, exc_type, exc_value, traceback): - fcntl.lockf(self.fd, fcntl.LOCK_UN) - self.fd.close() - - -class open_sh(open_lock): - """A context manager that opens a file with a shared lock""" - def __init__(self, path, mode): - super().__init__(path, mode, fcntl.LOCK_SH) - - -class open_ex(open_lock): - """A context manager that opens a file with an exclusive lock""" - def __init__(self, path, mode): - super().__init__(path, mode, fcntl.LOCK_EX) - - -class json_ro(open_sh): - """ - A context manager that opens a file in a shared, read-only mode. - The contents of the file are read as JSON and returned as a read- - only OrderedDict. - """ - def __init__(self, path): - super().__init__(path, 'r') - self.config = None - - def __enter__(self) -> ReadOnlyOrderedDict: - self.config = json.load(self.fd, object_pairs_hook=ReadOnlyOrderedDict) - return self.config - - -class json_rw(open_ex): - """ - A context manager that opens a file with an exclusive lock. The - file mode defaults to r+, which requires that the file exist. The - file mode can be set to w+ to create a new file by setting the new - kwarg in the ctor. The contents of the file are read as JSON and - returned in an AttrOrderedDict. Any changes to the context dict - will be written out to the file when the context manager exits, - unless an exception is raised before exiting. - """ - def __init__(self, path, new=False): - mode = 'w+' if new else 'r+' - super().__init__(path, mode) - self.config = None - self.new = new - - def __enter__(self) -> AttrOrderedDict: - if not self.new: - self.config = json.load(self.fd, object_pairs_hook=AttrOrderedDict) - else: - self.config = AttrOrderedDict() - return self.config - - def __exit__(self, exc_type, exc_value, traceback): - # Only write the new value out if there wasn't an exception - if not exc_type: - self.fd.seek(0) - json.dump(self.config, self.fd, allow_nan=False, indent='\t') - self.fd.truncate() - super().__exit__(exc_type, exc_value, traceback) diff --git a/amanuensis/config/dict.py b/amanuensis/config/dict.py deleted file mode 100644 index 09ddc5c..0000000 --- a/amanuensis/config/dict.py +++ /dev/null @@ -1,52 +0,0 @@ -""" -Dictionary classes used to represent JSON config files in memory. -""" -from collections import OrderedDict - -from amanuensis.errors import ReadOnlyError - - -class AttrOrderedDict(OrderedDict): - """ - An OrderedDict with attribute access to known keys and explicit - creation of new keys. - """ - def __getattr__(self, key): - if key not in self: - raise AttributeError(key) - return self[key] - - def __setattr__(self, key, value): - if key not in self: - raise AttributeError(key) - self[key] = value - - def new(self, key, value): - """Setter for adding new keys""" - if key in self: - raise KeyError("Key already exists: '{}'".format(key)) - self[key] = value - - -class ReadOnlyOrderedDict(OrderedDict): - """ - An OrderedDict that cannot be modified with attribute access to - known keys. - """ - def __readonly__(self, *args, **kwargs): - raise ReadOnlyError("Cannot modify a ReadOnlyOrderedDict") - - def __init__(self, *args, **kwargs): - super(ReadOnlyOrderedDict, self).__init__(*args, **kwargs) - self.__setitem__ = self.__readonly__ - self.__delitem__ = self.__readonly__ - self.pop = self.__readonly__ - self.popitem = self.__readonly__ - self.clear = self.__readonly__ - self.update = self.__readonly__ - self.setdefault = self.__readonly__ - - def __getattr__(self, key): - if key not in self: - raise AttributeError(key) - return self[key] diff --git a/amanuensis/config/directory.py b/amanuensis/config/directory.py deleted file mode 100644 index 806cde8..0000000 --- a/amanuensis/config/directory.py +++ /dev/null @@ -1,160 +0,0 @@ -""" -Config directory abstractions that encapsulate path munging and context -manager usage. -""" -import os -import re -from typing import Iterable - -from amanuensis.errors import MissingConfigError, ConfigAlreadyExistsError - -from .context import json_ro, json_rw - - -def is_guid(s: str) -> bool: - return bool(re.match(r'[0-9a-z]{32}', s.lower())) - - -class ConfigDirectoryContext(): - """ - Base class for CRUD operations on config files in a config - directory. - """ - def __init__(self, path: str): - self.path: str = path - if not os.path.isdir(self.path): - raise MissingConfigError(path) - - def new(self, filename) -> json_rw: - """ - Creates a JSON file that doesn't already exist. - """ - if not filename.endswith('.json'): - filename = f'{filename}.json' - fpath: str = os.path.join(self.path, filename) - if os.path.isfile(fpath): - raise ConfigAlreadyExistsError(fpath) - return json_rw(fpath, new=True) - - def read(self, filename) -> json_ro: - """ - Loads a JSON file in read-only mode. - """ - if not filename.endswith('.json'): - filename = f'{filename}.json' - fpath: str = os.path.join(self.path, filename) - if not os.path.isfile(fpath): - raise MissingConfigError(fpath) - return json_ro(fpath) - - def edit(self, filename, create=False) -> json_rw: - """ - Loads a JSON file in write mode. - """ - if not filename.endswith('.json'): - filename = f'{filename}.json' - fpath: str = os.path.join(self.path, filename) - if not create and not os.path.isfile(fpath): - raise MissingConfigError(fpath) - return json_rw(fpath, new=create) - - def delete(self, filename) -> None: - """Deletes a file.""" - if not filename.endswith('.json'): - filename = f'{filename}.json' - fpath: str = os.path.join(self.path, filename) - if not os.path.isfile(fpath): - raise MissingConfigError(fpath) - os.remove(fpath) - - def ls(self) -> Iterable[str]: - """Lists all files in this directory.""" - filenames: Iterable[str] = os.listdir(self.path) - return filenames - - -class ConfigFileConfigDirectoryContext(ConfigDirectoryContext): - """ - Config directory with a `config.json`. - """ - def __init__(self, path: str): - super().__init__(path) - config_path = os.path.join(self.path, 'config.json') - if not os.path.isfile(config_path): - raise MissingConfigError(config_path) - - def edit_config(self) -> json_rw: - """rw context manager for this object's config file.""" - return self.edit('config') - - def read_config(self) -> json_ro: - """ro context manager for this object's config file.""" - return self.read('config') - - -class IndexDirectoryContext(ConfigDirectoryContext): - """ - A lookup layer for getting config directory contexts for lexicon - or user directories. - """ - def __init__(self, path: str, cdc_type: type): - super().__init__(path) - index_path = os.path.join(self.path, 'index.json') - if not os.path.isfile(index_path): - raise MissingConfigError(index_path) - self.cdc_type = cdc_type - - def __getitem__(self, key: str) -> ConfigFileConfigDirectoryContext: - """ - Returns a context to the given item. key is treated as the - item's id if it's a guid string, otherwise it's treated as - the item's indexed name and run through the index first. - """ - if not is_guid(key): - with self.read_index() as index: - iid = index.get(key) - if not iid: - raise MissingConfigError(key) - key = iid - return self.cdc_type(os.path.join(self.path, key)) - - def edit_index(self) -> json_rw: - return self.edit('index') - - def read_index(self) -> json_ro: - return self.read('index') - - -class RootConfigDirectoryContext(ConfigFileConfigDirectoryContext): - """ - Context for the config directory with links to the lexicon and - user contexts. - """ - def __init__(self, path): - super().__init__(path) - self.lexicon: IndexDirectoryContext = IndexDirectoryContext( - os.path.join(self.path, 'lexicon'), - LexiconConfigDirectoryContext) - self.user: IndexDirectoryContext = IndexDirectoryContext( - os.path.join(self.path, 'user'), - UserConfigDirectoryContext) - - -class LexiconConfigDirectoryContext(ConfigFileConfigDirectoryContext): - """ - A config context for a lexicon's config directory. - """ - def __init__(self, path): - super().__init__(path) - self.draft: ConfigDirectoryContext = ConfigDirectoryContext( - os.path.join(self.path, 'draft')) - self.src: ConfigDirectoryContext = ConfigDirectoryContext( - os.path.join(self.path, 'src')) - self.article: ConfigDirectoryContext = ConfigDirectoryContext( - os.path.join(self.path, 'article')) - - -class UserConfigDirectoryContext(ConfigFileConfigDirectoryContext): - """ - A config context for a user's config directory. - """ diff --git a/amanuensis/config/init.py b/amanuensis/config/init.py deleted file mode 100644 index 571b8b7..0000000 --- a/amanuensis/config/init.py +++ /dev/null @@ -1,96 +0,0 @@ -# Standard library imports -from collections import OrderedDict -import fcntl -import json -import os -import shutil - -# Module imports -from amanuensis.resources import get_stream - -from .context import json_ro, json_rw - - -def create_config_dir(config_dir, refresh=False): - """ - Create or refresh a config directory - """ - - def prepend(*path): - joined = os.path.join(*path) - if not joined.startswith(config_dir): - joined = os.path.join(config_dir, joined) - return joined - - # Create the directory if it doesn't exist. - if not os.path.isdir(config_dir): - os.mkdir(config_dir) - - # The directory should be empty if we're not updating an existing one. - if len(os.listdir(config_dir)) > 0 and not refresh: - print("Directory {} is not empty".format(config_dir)) - return -1 - - # Update or create global config. - def_cfg = get_stream("global.json") - global_config_path = prepend("config.json") - if refresh and os.path.isfile(global_config_path): - # We need to write an entirely different ordereddict to the config - # file, so we mimic the config.context functionality manually. - with open(global_config_path, 'r+', encoding='utf8') as cfg_file: - fcntl.lockf(cfg_file, fcntl.LOCK_EX) - old_cfg = json.load(cfg_file, object_pairs_hook=OrderedDict) - new_cfg = json.load(def_cfg, object_pairs_hook=OrderedDict) - merged = {} - for key in new_cfg: - merged[key] = old_cfg[key] if key in old_cfg else new_cfg[key] - if key not in old_cfg: - print("Added key '{}' to config".format(key)) - for key in old_cfg: - if key not in new_cfg: - print("Config contains unknown key '{}'".format(key)) - merged[key] = old_cfg[key] - cfg_file.seek(0) - json.dump(merged, cfg_file, allow_nan=False, indent='\t') - cfg_file.truncate() - fcntl.lockf(cfg_file, fcntl.LOCK_UN) - else: - with open(prepend("config.json"), 'wb') as f: - f.write(def_cfg.read()) - - # Ensure lexicon subdir exists. - if not os.path.isdir(prepend("lexicon")): - os.mkdir(prepend("lexicon")) - if not os.path.isfile(prepend("lexicon", "index.json")): - with open(prepend("lexicon", "index.json"), 'w') as f: - json.dump({}, f) - - # Ensure user subdir exists. - if not os.path.isdir(prepend("user")): - os.mkdir(prepend("user")) - if not os.path.isfile(prepend('user', 'index.json')): - with open(prepend('user', 'index.json'), 'w') as f: - json.dump({}, f) - - if refresh: - for dir_name in ('lexicon', 'user'): - # Clean up unindexed folders - with json_ro(prepend(dir_name, 'index.json')) as index: - known = list(index.values()) - entries = os.listdir(prepend(dir_name)) - for dir_entry in entries: - if dir_entry == "index.json": - continue - if dir_entry in known: - continue - print("Removing unindexed folder: '{}/{}'" - .format(dir_name, dir_entry)) - shutil.rmtree(prepend(dir_name, dir_entry)) - - # Remove orphaned index listings - with json_rw(prepend(dir_name, 'index.json')) as index: - for name, entry in index.items(): - if not os.path.isdir(prepend(dir_name, entry)): - print("Removing stale {} index entry '{}: {}'" - .format(dir_name, name, entry)) - del index[name] diff --git a/amanuensis/db/database.py b/amanuensis/db/database.py index 254a488..0fb68f3 100644 --- a/amanuensis/db/database.py +++ b/amanuensis/db/database.py @@ -3,8 +3,12 @@ Database connection setup """ from sqlalchemy import create_engine, MetaData, event from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import scoped_session -from sqlalchemy.orm import sessionmaker +from sqlalchemy.orm import scoped_session, sessionmaker + +try: + from greenlet import getcurrent as get_ident +except ImportError: + from threading import get_ident # Define naming conventions for generated constraints @@ -34,7 +38,9 @@ class DbContext: cursor.close() # Create a thread-safe session factory - self.session = scoped_session(sessionmaker(bind=self.engine)) + self.session = scoped_session( + sessionmaker(bind=self.engine), scopefunc=get_ident + ) def __call__(self, *args, **kwargs): """Provides shortcut access to session.execute.""" diff --git a/amanuensis/db/models.py b/amanuensis/db/models.py index 471e05a..0b1642f 100644 --- a/amanuensis/db/models.py +++ b/amanuensis/db/models.py @@ -13,7 +13,6 @@ from sqlalchemy import ( ForeignKey, Integer, String, - Table, Text, text, TypeDecorator, @@ -234,6 +233,14 @@ class Lexicon(ModelBase): content_rules = relationship("ArticleContentRule", back_populates="lexicon") posts = relationship("Post", back_populates="lexicon") + ####################### + # Derived information # + ####################### + + @property + def full_title(self: "Lexicon") -> str: + return self.title if self.title else f"Lexicon {self.name}" + class Membership(ModelBase): """ diff --git a/amanuensis/log/__init__.py b/amanuensis/log/__init__.py deleted file mode 100644 index 1352ce0..0000000 --- a/amanuensis/log/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from .setup import init_logging - -__all__ = [member.__name__ for member in [ - init_logging -]] diff --git a/amanuensis/log/setup.py b/amanuensis/log/setup.py deleted file mode 100644 index 6c7d24d..0000000 --- a/amanuensis/log/setup.py +++ /dev/null @@ -1,45 +0,0 @@ -import logging -import logging.handlers - - -basic_formatter = logging.Formatter( - fmt='[{levelname}] {message}', - style='{') -detailed_formatter = logging.Formatter( - fmt='[{asctime}] [{levelname}:{filename}:{lineno}] {message}', - style='{') -basic_console_handler = logging.StreamHandler() -basic_console_handler.setLevel(logging.INFO) -basic_console_handler.setFormatter(basic_formatter) -detailed_console_handler = logging.StreamHandler() -detailed_console_handler.setLevel(logging.DEBUG) -detailed_console_handler.setFormatter(detailed_formatter) - - -def get_file_handler(filename: str) -> logging.Handler: - handler = logging.handlers.RotatingFileHandler( - filename=filename, - maxBytes=1000000, - backupCount=10, - delay=True, - encoding='utf8', - ) - handler.setLevel(logging.DEBUG) - handler.setFormatter(detailed_formatter) - return handler - - -def init_logging(verbose: bool, log_filename: str): - """ - Initializes the Amanuensis logger settings - """ - logger = logging.getLogger("amanuensis") - if log_filename: - logger.addHandler(get_file_handler(log_filename)) - logger.setLevel(logging.DEBUG) - elif verbose: - logger.addHandler(detailed_console_handler) - logger.setLevel(logging.DEBUG) - else: - logger.addHandler(basic_console_handler) - logger.setLevel(logging.INFO) diff --git a/amanuensis/server/__init__.py b/amanuensis/server/__init__.py index 41fe451..e144471 100644 --- a/amanuensis/server/__init__.py +++ b/amanuensis/server/__init__.py @@ -1,46 +1,68 @@ +import json import os -from flask import Flask +from flask import Flask, g -from amanuensis.config import RootConfigDirectoryContext, ENV_CONFIG_DIR -from amanuensis.models import ModelFactory -from .auth import get_login_manager, bp_auth -from .helpers import register_custom_filters -from .home import bp_home -from .lexicon import bp_lexicon -from .session import bp_session +from amanuensis.config import AmanuensisConfig, CommandLineConfig +from amanuensis.db import DbContext +import amanuensis.server.home -def get_app(root: RootConfigDirectoryContext) -> Flask: - # Flask app init - with root.read_config() as cfg: - app = Flask( - __name__, - template_folder='.', - static_folder=cfg.static_root - ) - app.secret_key = bytes.fromhex(cfg.secret_key) - app.config['root'] = root - app.config['model_factory'] = ModelFactory(root) - app.jinja_options['trim_blocks'] = True - app.jinja_options['lstrip_blocks'] = True - register_custom_filters(app) +def get_app( + config: AmanuensisConfig, + db: DbContext = None, +) -> Flask: + """Application factory""" + # Create the Flask object + app = Flask(__name__, template_folder=".", static_folder=config.STATIC_ROOT) - # Flask-Login init - login_manager = get_login_manager(root) - login_manager.init_app(app) + # Load keys from the config object + app.config.from_object(config) - # Blueprint inits - app.register_blueprint(bp_auth) - app.register_blueprint(bp_home) - app.register_blueprint(bp_lexicon) - app.register_blueprint(bp_session) + # If a config file is now specified, also load keys from there + if config_path := app.config.get("CONFIG_FILE", None): + app.config.from_file(os.path.abspath(config_path), json.load) - return app + # Assert that all required config values are now set + for config_key in ("SECRET_KEY", "DATABASE_URI"): + if not app.config.get(config_key): + raise Exception(f"{config_key} must be defined") + + # Create the database context, if one wasn't already given + if db is None: + db = DbContext(app.config["DATABASE_URI"]) + + # Make the database connection available to requests via g + def db_setup(): + g.db = db + + app.before_request(db_setup) + + # Tear down the session on request teardown + def db_teardown(response_or_exc): + db.session.remove() + + app.teardown_appcontext(db_teardown) + + # Configure jinja options + app.jinja_options.update(trim_blocks=True, lstrip_blocks=True) + + # Set up Flask-Login + # TODO + + # Register blueprints + app.register_blueprint(amanuensis.server.home.bp) + + def test(): + return "Hello, world!" + + app.route("/")(test) + + return app -def default(): - cwd = os.getcwd() - config_dir = os.environ.get(ENV_CONFIG_DIR, "amanuensis") - root = RootConfigDirectoryContext(os.path.join(cwd, config_dir)) - return get_app(root) +def run(): + """Run the server, populating the config from the command line.""" + config = CommandLineConfig() + app = get_app(config) + app.run(debug=app.testing) diff --git a/amanuensis/server/home/__init__.py b/amanuensis/server/home/__init__.py index 68477ca..162efa5 100644 --- a/amanuensis/server/home/__init__.py +++ b/amanuensis/server/home/__init__.py @@ -1,64 +1,64 @@ -from flask import Blueprint, render_template, redirect, url_for, current_app -from flask_login import login_required, current_user +from flask import Blueprint, render_template, g -from amanuensis.config import RootConfigDirectoryContext -from amanuensis.lexicon import create_lexicon, load_all_lexicons -from amanuensis.models import UserModel, ModelFactory -from amanuensis.server.helpers import admin_required +# from flask import Blueprint, render_template, redirect, url_for, current_app +# from flask_login import login_required, current_user -from .forms import LexiconCreateForm +import amanuensis.backend.user as userq +import amanuensis.backend.lexicon as lexiq -bp_home = Blueprint('home', __name__, - url_prefix='/home', - template_folder='.') +# from amanuensis.config import RootConfigDirectoryContext +# from amanuensis.lexicon import create_lexicon, load_all_lexicons +# from amanuensis.models import UserModel, ModelFactory +# from amanuensis.server.helpers import admin_required + +# from .forms import LexiconCreateForm + +bp = Blueprint("home", __name__, url_prefix="/home", template_folder=".") -@bp_home.route('/', methods=['GET']) -def home(): - root: RootConfigDirectoryContext = current_app.config['root'] - user: UserModel = current_user - user_lexicons = [] - public_lexicons = [] - for lexicon in load_all_lexicons(root): - if user.uid in lexicon.cfg.join.joined: - user_lexicons.append(lexicon) - elif lexicon.cfg.join.public: - public_lexicons.append(lexicon) - return render_template( - 'home.root.jinja', - user_lexicons=user_lexicons, - public_lexicons=public_lexicons) +# @bp.get("/") +# def home(): +# Show lexicons that are visible to the current user +# return "TODO" +# user_lexicons = [] +# public_lexicons = [] +# for lexicon in load_all_lexicons(root): +# if user.uid in lexicon.cfg.join.joined: +# user_lexicons.append(lexicon) +# elif lexicon.cfg.join.public: +# public_lexicons.append(lexicon) +# return render_template( +# 'home.root.jinja', +# user_lexicons=user_lexicons, +# public_lexicons=public_lexicons) -@bp_home.route('/admin/', methods=['GET']) -@login_required -@admin_required +@bp.get("/admin/") +# @login_required +# @admin_required def admin(): - root: RootConfigDirectoryContext = current_app.config['root'] - users = [] - lexicons = list(load_all_lexicons(root)) - return render_template('home.admin.jinja', users=users, lexicons=lexicons) + return render_template("home.admin.jinja", db=g.db, userq=userq, lexiq=lexiq) -@bp_home.route("/admin/create/", methods=['GET', 'POST']) -@login_required -@admin_required -def admin_create(): - form = LexiconCreateForm() +# @bp_home.route("/admin/create/", methods=['GET', 'POST']) +# @login_required +# @admin_required +# def admin_create(): +# form = LexiconCreateForm() - if not form.validate_on_submit(): - # GET or POST with invalid form data - return render_template('home.create.jinja', form=form) +# if not form.validate_on_submit(): +# # GET or POST with invalid form data +# return render_template('home.create.jinja', form=form) - # POST with valid data - root: RootConfigDirectoryContext = current_app.config['root'] - model_factory: ModelFactory = current_app.config['model_factory'] - lexicon_name = form.lexiconName.data - editor_name = form.editorName.data - prompt = form.promptText.data - # Editor's existence was checked by form validators - editor = model_factory.user(editor_name) - lexicon = create_lexicon(root, lexicon_name, editor) - with lexicon.ctx.edit_config() as cfg: - cfg.prompt = prompt - return redirect(url_for('session.session', name=lexicon_name)) +# # POST with valid data +# root: RootConfigDirectoryContext = current_app.config['root'] +# model_factory: ModelFactory = current_app.config['model_factory'] +# lexicon_name = form.lexiconName.data +# editor_name = form.editorName.data +# prompt = form.promptText.data +# # Editor's existence was checked by form validators +# editor = model_factory.user(editor_name) +# lexicon = create_lexicon(root, lexicon_name, editor) +# with lexicon.ctx.edit_config() as cfg: +# cfg.prompt = prompt +# return redirect(url_for('session.session', name=lexicon_name)) diff --git a/amanuensis/server/home/home.admin.jinja b/amanuensis/server/home/home.admin.jinja index 2b8ecff..854f4a7 100644 --- a/amanuensis/server/home/home.admin.jinja +++ b/amanuensis/server/home/home.admin.jinja @@ -3,17 +3,18 @@ {% block title %}Admin | Amanuensis{% endblock %} {% block header %}
Users:
-{% for user in users %} +{% for user in userq.get_all_users(db) %} {{ macros.dashboard_user_item(user) }} {% endfor %}Lexicons:
-{% for lexicon in lexicons %} +{% for lexicon in lexiq.get_all_lexicons(db) %} {{ macros.dashboard_lexicon_item(lexicon) }} {% endfor %} {% endblock %} diff --git a/amanuensis/server/macros.jinja b/amanuensis/server/macros.jinja index 9cbf0b0..7909f7b 100644 --- a/amanuensis/server/macros.jinja +++ b/amanuensis/server/macros.jinja @@ -1,45 +1,47 @@ {% macro dashboard_lexicon_item(lexicon) %} -- - - Lexicon {{ lexicon.cfg.name }} - - [{{ lexicon.status.capitalize() }}] -
-{{ lexicon.cfg.prompt }}
- {% if current_user.is_authenticated %} -- {% - if current_user.uid in lexicon.cfg.join.joined - or current_user.cfg.is_admin - %} - Editor: {{ lexicon.cfg.editor|user_attr('username') }} / - Players: - {% for uid in lexicon.cfg.join.joined %} - {{ uid|user_attr('username') }}{% if not loop.last %}, {% endif %} - {% endfor %} - ({{ lexicon.cfg.join.joined|count }}/{{ lexicon.cfg.join.max_players }}) - {% else %} - Players: {{ lexicon.cfg.join.joined|count }}/{{ lexicon.cfg.join.max_players }} - {% if lexicon.cfg.join.public and lexicon.cfg.join.open %} - / - Join game - - {% endif %} - {% endif %} -
- {% endif %} +{% set status = "completed" if lexicon.completed else "ongoing" if lexicon.started else "unstarted" %} ++ + + {{ lexicon.full_title }} + + [{{ lexicon.status.capitalize() }}] +
+{{ lexicon.prompt }}
+ {# {% if current_user.is_authenticated %} #} ++ {# TODO #} + {# {% + if current_user.uid in lexicon.cfg.join.joined + or current_user.cfg.is_admin + %} #} + Editor: {#{ lexicon.cfg.editor|user_attr('username') }#} / + Players: + {# {% for uid in lexicon.cfg.join.joined %} #} + {# {{ uid|user_attr('username') }}{% if not loop.last %}, {% endif %} #} + {# {% endfor %} #} + {# ({{ lexicon.cfg.join.joined|count }}/{{ lexicon.cfg.join.max_players }}) #} + {# {% else %} #} + {# Players: {{ lexicon.cfg.join.joined|count }}/{{ lexicon.cfg.join.max_players }} #} + {# {% if lexicon.cfg.join.public and lexicon.cfg.join.open %} #} + {# / #} + {# Join game #} + {# #} + {# {% endif %} #} + {# {% endif %} #} +
+ {# {% endif %} #}- {{ user.cfg.username }} - {% if user.cfg.username != user.cfg.displayname %} / {{ user.cfg.displayname }}{% endif %} - ({{user.uid}}) -
-Last activity: {{ user.cfg.last_activity|asdate }} — Last login: {{ user.cfg.last_login|asdate }}
++ {{ user.username }} + {% if user.username != user.display_name %} / {{ user.display_name }}{% endif %} + (id #{{user.id}}) +
+Last activity: {{ user.last_activity }} — Last login: {{ user.last_login }}