Compare commits
5 Commits
e23019bff6
...
324788f9a1
Author | SHA1 | Date |
---|---|---|
Tim Van Baak | 324788f9a1 | |
Tim Van Baak | 32a0948deb | |
Tim Van Baak | 64973c4873 | |
Tim Van Baak | 3890d4b11e | |
Tim Van Baak | f6c0b9509d |
|
@ -1,72 +1,94 @@
|
||||||
#
|
from argparse import ArgumentParser
|
||||||
# The cli module must not import other parts of the application at the module
|
import logging
|
||||||
# level. This is because most other modules depend on the config module. The
|
import logging.config
|
||||||
# config module may depend on __main__'s commandline parsing to locate config
|
|
||||||
# files, and __main__'s commandline parsing requires importing (but not
|
import amanuensis.cli.admin
|
||||||
# executing) the functions in the cli module. Thus, cli functions must only
|
|
||||||
# import the config module inside the various command methods, which are only
|
|
||||||
# run after commandline parsing has already occurred.
|
|
||||||
#
|
|
||||||
|
|
||||||
|
|
||||||
def server_commands(commands={}):
|
LOGGING_CONFIG = {
|
||||||
if commands:
|
"version": 1,
|
||||||
return commands
|
"formatters": {
|
||||||
import amanuensis.cli.server
|
"fmt_basic": {
|
||||||
for name, func in vars(amanuensis.cli.server).items():
|
"validate": True,
|
||||||
|
"format": "%(message)s",
|
||||||
|
},
|
||||||
|
"fmt_detailed": {
|
||||||
|
"validate": True,
|
||||||
|
"format": "%(asctime)s %(levelname)s %(message)s"
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"handlers": {
|
||||||
|
"hnd_stderr": {
|
||||||
|
"class": "logging.StreamHandler",
|
||||||
|
"level": "INFO",
|
||||||
|
"formatter": "fmt_basic",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"loggers": {
|
||||||
|
__name__: {
|
||||||
|
"level": "DEBUG",
|
||||||
|
"handlers": ["hnd_stderr"]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def add_subcommand(subparsers, module) -> None:
|
||||||
|
"""Add a cli submodule's commands as a subparser."""
|
||||||
|
# Get the command information from the module
|
||||||
|
command_name: str = getattr(module, "COMMAND_NAME")
|
||||||
|
command_help: str = getattr(module, "COMMAND_HELP")
|
||||||
|
if not command_name and command_help:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Add the subparser for the command and set a default action
|
||||||
|
command_parser: ArgumentParser = subparsers.add_parser(
|
||||||
|
command_name, help=command_help
|
||||||
|
)
|
||||||
|
command_parser.set_defaults(func=lambda args: command_parser.print_usage())
|
||||||
|
|
||||||
|
# Add all subcommands in the command module
|
||||||
|
subcommands = command_parser.add_subparsers(metavar="SUBCOMMAND")
|
||||||
|
for name, obj in vars(module).items():
|
||||||
if name.startswith("command_"):
|
if name.startswith("command_"):
|
||||||
name = name[8:].replace("_", "-")
|
# Hyphenate subcommand names
|
||||||
commands[name] = func
|
sc_name: str = name[8:].replace("_", "-")
|
||||||
return commands
|
# Only the first line of the subcommand function docstring is used
|
||||||
|
sc_help = ((obj.__doc__ or "").strip() or "\n").splitlines()[0]
|
||||||
|
|
||||||
|
# Add the command and any arguments defined by its decorators
|
||||||
|
subcommand: ArgumentParser = subcommands.add_parser(
|
||||||
|
sc_name, help=sc_help, description=obj.__doc__
|
||||||
|
)
|
||||||
|
subcommand.set_defaults(func=obj)
|
||||||
|
for args, kwargs in obj.__dict__.get("add_argument", []):
|
||||||
|
subcommand.add_argument(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
def lexicon_commands(commands={}):
|
def init_logger(args):
|
||||||
if commands:
|
"""Set up logging based on verbosity args"""
|
||||||
return commands
|
if (args.verbose):
|
||||||
import amanuensis.cli.lexicon
|
handler = LOGGING_CONFIG["handlers"]["hnd_stderr"]
|
||||||
for name, func in vars(amanuensis.cli.lexicon).items():
|
handler["formatter"] = "fmt_detailed"
|
||||||
if name.startswith("command_"):
|
handler["level"] = "DEBUG"
|
||||||
name = name[8:].replace("_", "-")
|
logging.config.dictConfig(LOGGING_CONFIG)
|
||||||
commands["lexicon-" + name] = func
|
|
||||||
return commands
|
|
||||||
|
|
||||||
|
|
||||||
def user_commands(commands={}):
|
def main():
|
||||||
if commands:
|
"""CLI entry point"""
|
||||||
return commands
|
# Set up the top-level parser
|
||||||
import amanuensis.cli.user
|
parser = ArgumentParser()
|
||||||
for name, func in vars(amanuensis.cli.user).items():
|
parser.set_defaults(
|
||||||
if name.startswith("command_"):
|
parser=parser,
|
||||||
name = name[8:].replace("_", "-")
|
func=lambda args: parser.print_usage(),
|
||||||
commands["user-" + name] = func
|
)
|
||||||
return commands
|
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
|
||||||
|
|
||||||
|
# Add commands from cli submodules
|
||||||
|
subparsers = parser.add_subparsers(metavar="COMMAND")
|
||||||
|
add_subcommand(subparsers, amanuensis.cli.admin)
|
||||||
|
|
||||||
def get_commands():
|
# Parse args and execute the desired action
|
||||||
return {**server_commands(), **lexicon_commands(), **user_commands()}
|
args = parser.parse_args()
|
||||||
|
init_logger(args)
|
||||||
|
args.func(args)
|
||||||
def cmd_desc(func):
|
|
||||||
return ((func.__doc__ or "").strip() or '\n').splitlines()[0]
|
|
||||||
|
|
||||||
|
|
||||||
def describe_commands():
|
|
||||||
longest = max(map(len, server_commands().keys()))
|
|
||||||
server_desc = "General commands:\n{}\n".format("\n".join([
|
|
||||||
" {1:<{0}} : {2}".format(longest, name, cmd_desc(func))
|
|
||||||
for name, func in server_commands().items()
|
|
||||||
]))
|
|
||||||
|
|
||||||
longest = max(map(len, lexicon_commands().keys()))
|
|
||||||
lexicon_desc = "Lexicon commands:\n{}\n".format("\n".join([
|
|
||||||
" {1:<{0}} : {2}".format(longest, name, cmd_desc(func))
|
|
||||||
for name, func in lexicon_commands().items()
|
|
||||||
]))
|
|
||||||
|
|
||||||
longest = max(map(len, user_commands().keys()))
|
|
||||||
user_desc = "User commands:\n{}\n".format("\n".join([
|
|
||||||
" {1:<{0}} : {2}".format(longest, name, cmd_desc(func))
|
|
||||||
for name, func in user_commands().items()
|
|
||||||
]))
|
|
||||||
|
|
||||||
return "\n".join([server_desc, lexicon_desc, user_desc])
|
|
||||||
|
|
|
@ -0,0 +1,58 @@
|
||||||
|
import collections
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
|
||||||
|
from amanuensis.db import DbContext
|
||||||
|
|
||||||
|
from .helpers import add_argument
|
||||||
|
|
||||||
|
|
||||||
|
COMMAND_NAME = "admin"
|
||||||
|
COMMAND_HELP = "Interact with Amanuensis."
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@add_argument("path", metavar="DB_PATH", help="Path to where the database should be created")
|
||||||
|
@add_argument("--force", "-f", action="store_true", help="Overwrite existing database")
|
||||||
|
@add_argument("--verbose", "-v", action="store_true", help="Enable db echo")
|
||||||
|
def command_init_db(args) -> int:
|
||||||
|
"""
|
||||||
|
Initialize the Amanuensis database.
|
||||||
|
"""
|
||||||
|
# Check if force is required
|
||||||
|
if not args.force and os.path.exists(args.path):
|
||||||
|
args.parser.error(f"{args.path} already exists and --force was not specified")
|
||||||
|
|
||||||
|
# Initialize the database
|
||||||
|
db_uri = f"sqlite:///{os.path.abspath(args.path)}"
|
||||||
|
LOG.info(f"Creating database at {db_uri}")
|
||||||
|
db = DbContext(db_uri, debug=args.verbose)
|
||||||
|
db.create_all()
|
||||||
|
|
||||||
|
LOG.info("Done")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
@add_argument("path", metavar="CONFIG_PATH", help="Path to the config file")
|
||||||
|
def command_secret_key(args) -> int:
|
||||||
|
"""
|
||||||
|
Generate a Flask secret key.
|
||||||
|
|
||||||
|
The Flask server will not run unless a secret key has
|
||||||
|
been generated.
|
||||||
|
"""
|
||||||
|
# Load the json config
|
||||||
|
with open(args.path, mode="r", encoding="utf8") as f:
|
||||||
|
config = json.load(f, object_pairs_hook=collections.OrderedDict)
|
||||||
|
|
||||||
|
# Set the secret key to a new random string
|
||||||
|
config["SECRET_KEY"] = os.urandom(32).hex()
|
||||||
|
|
||||||
|
# Write the config back out
|
||||||
|
with open(args.path, mode="w", encoding="utf8") as f:
|
||||||
|
json.dump(config, f, indent=2)
|
||||||
|
|
||||||
|
LOG.info("Regenerated Flask secret key")
|
||||||
|
return 0
|
|
@ -1,209 +1,20 @@
|
||||||
# Standard library imports
|
"""
|
||||||
from argparse import ArgumentParser
|
Helpers for cli commands.
|
||||||
from functools import wraps
|
"""
|
||||||
from json.decoder import JSONDecodeError
|
|
||||||
from logging import getLogger
|
|
||||||
from sys import exc_info
|
|
||||||
|
|
||||||
logger = getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
#
|
|
||||||
# The add_argument and no_argument function wrappers allow the same
|
|
||||||
# function to both configure a command and execute it. This keeps
|
|
||||||
# command argument configuration close to where the command is defined
|
|
||||||
# and reduces the number of things the main parser has to handle.
|
|
||||||
#
|
|
||||||
|
|
||||||
|
|
||||||
def add_argument(*args, **kwargs):
|
def add_argument(*args, **kwargs):
|
||||||
"""Passes the given args and kwargs to subparser.add_argument"""
|
"""Defines an argument to a cli command."""
|
||||||
|
|
||||||
def argument_adder(command):
|
def argument_adder(command_func):
|
||||||
@wraps(command)
|
"""Decorator function for storing parser args on the function."""
|
||||||
def augmented_command(cmd_args):
|
|
||||||
# Add this wrapper's command in the parser pass
|
|
||||||
if isinstance(cmd_args, ArgumentParser):
|
|
||||||
cmd_args.add_argument(*args, **kwargs)
|
|
||||||
# If there are more command wrappers, pass through to them
|
|
||||||
if command.__dict__.get('wrapper', False):
|
|
||||||
command(cmd_args)
|
|
||||||
# Parser pass doesn't return a value
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Pass through transparently in the execute pass
|
# Store the kw/args in the function dictionary
|
||||||
return command(cmd_args)
|
add_args = command_func.__dict__.get("add_argument", [])
|
||||||
|
add_args.append((args, kwargs))
|
||||||
|
command_func.__dict__["add_argument"] = add_args
|
||||||
|
|
||||||
# Mark the command as wrapped so control passes through
|
# Return the same function
|
||||||
augmented_command.__dict__['wrapper'] = True
|
return command_func
|
||||||
return augmented_command
|
|
||||||
|
|
||||||
return argument_adder
|
return argument_adder
|
||||||
|
|
||||||
|
|
||||||
def no_argument(command):
|
|
||||||
"""Noops for subparsers"""
|
|
||||||
@wraps(command)
|
|
||||||
def augmented_command(cmd_args):
|
|
||||||
# Noop in the parser pass
|
|
||||||
if isinstance(cmd_args, ArgumentParser):
|
|
||||||
return None
|
|
||||||
# Pass through in the execute pass
|
|
||||||
return command(cmd_args)
|
|
||||||
|
|
||||||
return augmented_command
|
|
||||||
|
|
||||||
|
|
||||||
#
|
|
||||||
# Many commands require specifying a lexicon or user to operate on, so
|
|
||||||
# the requires_lexicon and requires_user wrappers replace @add_argument
|
|
||||||
# as well as automatically create the model for the object from the
|
|
||||||
# provided identifier.
|
|
||||||
#
|
|
||||||
|
|
||||||
|
|
||||||
LEXICON_ARGS = ['--lexicon']
|
|
||||||
LEXICON_KWARGS = {
|
|
||||||
'metavar': 'LEXICON',
|
|
||||||
'dest': 'lexicon',
|
|
||||||
'help': 'Specify a user to operate on'}
|
|
||||||
|
|
||||||
|
|
||||||
def requires_lexicon(command):
|
|
||||||
@wraps(command)
|
|
||||||
def augmented_command(cmd_args):
|
|
||||||
# Add lexicon argument in parser pass
|
|
||||||
if isinstance(cmd_args, ArgumentParser):
|
|
||||||
cmd_args.add_argument(*LEXICON_ARGS, **LEXICON_KWARGS)
|
|
||||||
# If there are more command wrappers, pass through to them
|
|
||||||
if command.__dict__.get('wrapper', False):
|
|
||||||
command(cmd_args)
|
|
||||||
# Parser pass doesn't return a value
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Verify lexicon argument in execute pass
|
|
||||||
val = getattr(cmd_args, 'lexicon', None)
|
|
||||||
if not val:
|
|
||||||
logger.error("Missing --lexicon argument")
|
|
||||||
return -1
|
|
||||||
try:
|
|
||||||
model_factory = cmd_args.model_factory
|
|
||||||
cmd_args.lexicon = model_factory.lexicon(val)
|
|
||||||
except Exception:
|
|
||||||
ex_type, value, tb = exc_info()
|
|
||||||
logger.error(
|
|
||||||
f'Loading lexicon "{val}" failed with '
|
|
||||||
f'{ex_type.__name__}: {value}')
|
|
||||||
return -1
|
|
||||||
return command(cmd_args)
|
|
||||||
|
|
||||||
augmented_command.__dict__['wrapper'] = True
|
|
||||||
return augmented_command
|
|
||||||
|
|
||||||
|
|
||||||
USER_ARGS = ['--user']
|
|
||||||
USER_KWARGS = {
|
|
||||||
'metavar': 'USER',
|
|
||||||
'dest': 'user',
|
|
||||||
'help': 'Specify a user to operate on'}
|
|
||||||
|
|
||||||
|
|
||||||
def requires_user(command):
|
|
||||||
@wraps(command)
|
|
||||||
def augmented_command(cmd_args):
|
|
||||||
# Add user argument in parser pass
|
|
||||||
if isinstance(cmd_args, ArgumentParser):
|
|
||||||
cmd_args.add_argument(*USER_ARGS, **USER_KWARGS)
|
|
||||||
# If there are more command wrappers, pass through to them
|
|
||||||
if command.__dict__.get('wrapper', False):
|
|
||||||
command(cmd_args)
|
|
||||||
# Parser pass doesn't return a value
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Verify user argument in execute pass
|
|
||||||
val = getattr(cmd_args, "user", None)
|
|
||||||
if not val:
|
|
||||||
logger.error("Missing --user argument")
|
|
||||||
return -1
|
|
||||||
try:
|
|
||||||
model_factory = cmd_args.model_factory
|
|
||||||
cmd_args.user = model_factory.user(val)
|
|
||||||
except Exception:
|
|
||||||
ex_type, value, tb = exc_info()
|
|
||||||
logger.error(
|
|
||||||
f'Loading user "{val}" failed with '
|
|
||||||
f'{ex_type.__name__}: {value}')
|
|
||||||
return -1
|
|
||||||
return command(cmd_args)
|
|
||||||
|
|
||||||
augmented_command.__dict__['wrapper'] = True
|
|
||||||
return augmented_command
|
|
||||||
|
|
||||||
|
|
||||||
# Wrapper for aliasing commands
|
|
||||||
def alias(cmd_alias):
|
|
||||||
"""Adds an alias to the function dictionary"""
|
|
||||||
def aliaser(command):
|
|
||||||
aliases = command.__dict__.get('aliases', [])
|
|
||||||
aliases.append(cmd_alias)
|
|
||||||
command.__dict__['aliases'] = aliases
|
|
||||||
return command
|
|
||||||
return aliaser
|
|
||||||
|
|
||||||
|
|
||||||
# Helpers for common command tasks
|
|
||||||
|
|
||||||
CONFIG_GET_ROOT_VALUE = object()
|
|
||||||
|
|
||||||
|
|
||||||
def config_get(cfg, pathspec):
|
|
||||||
"""
|
|
||||||
Performs config --get for a given config
|
|
||||||
|
|
||||||
cfg is from a `with json_ro` context
|
|
||||||
path is the full pathspec, unsplit
|
|
||||||
"""
|
|
||||||
import json
|
|
||||||
|
|
||||||
if pathspec is CONFIG_GET_ROOT_VALUE:
|
|
||||||
path = []
|
|
||||||
else:
|
|
||||||
path = pathspec.split(".")
|
|
||||||
for spec in path:
|
|
||||||
if spec not in cfg:
|
|
||||||
logger.error("Path not found: {}".format(pathspec))
|
|
||||||
return -1
|
|
||||||
cfg = cfg.get(spec)
|
|
||||||
print(json.dumps(cfg, indent=2))
|
|
||||||
return 0
|
|
||||||
|
|
||||||
|
|
||||||
def config_set(obj_id, cfg, set_tuple):
|
|
||||||
"""
|
|
||||||
Performs config --set for a given config
|
|
||||||
|
|
||||||
config is from a "with json_rw" context
|
|
||||||
set_tuple is a tuple of the pathspec and the value
|
|
||||||
"""
|
|
||||||
import json
|
|
||||||
pathspec, value = set_tuple
|
|
||||||
if not pathspec:
|
|
||||||
logger.error("Path must be non-empty")
|
|
||||||
path = pathspec.split('.')
|
|
||||||
try:
|
|
||||||
value = json.loads(value)
|
|
||||||
except JSONDecodeError:
|
|
||||||
pass # Leave value as string
|
|
||||||
for spec in path[:-1]:
|
|
||||||
if spec not in cfg:
|
|
||||||
logger.error("Path not found")
|
|
||||||
return -1
|
|
||||||
cfg = cfg.get(spec)
|
|
||||||
key = path[-1]
|
|
||||||
if key not in cfg:
|
|
||||||
logger.error("Path not found")
|
|
||||||
return -1
|
|
||||||
old_value = cfg[key]
|
|
||||||
cfg[key] = value
|
|
||||||
logger.info("{}.{}: {} -> {}".format(obj_id, pathspec, old_value, value))
|
|
||||||
return 0
|
|
||||||
|
|
|
@ -1,120 +0,0 @@
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
|
|
||||||
from amanuensis.config import RootConfigDirectoryContext
|
|
||||||
|
|
||||||
from .helpers import (
|
|
||||||
add_argument,
|
|
||||||
no_argument,
|
|
||||||
alias,
|
|
||||||
config_get,
|
|
||||||
config_set,
|
|
||||||
CONFIG_GET_ROOT_VALUE)
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
@alias('i')
|
|
||||||
@add_argument("--refresh",
|
|
||||||
action="store_true",
|
|
||||||
help="Refresh an existing config directory")
|
|
||||||
def command_init(args):
|
|
||||||
"""
|
|
||||||
Initialize a config directory at --config-dir
|
|
||||||
|
|
||||||
A clean config directory will contain a config.json, a
|
|
||||||
lexicon config directory, and a user config directory.
|
|
||||||
|
|
||||||
Refreshing an existing directory will add keys to the global config that
|
|
||||||
are present in the default configs. Users and lexicons that are missing
|
|
||||||
from the indexes will be deleted, and stale index entries will be removed.
|
|
||||||
"""
|
|
||||||
# Module imports
|
|
||||||
from amanuensis.config.init import create_config_dir
|
|
||||||
|
|
||||||
# Verify arguments
|
|
||||||
if args.refresh and not os.path.isdir(args.config_dir):
|
|
||||||
print("Error: couldn't find directory '{}'".format(args.config_dir))
|
|
||||||
|
|
||||||
# Internal call
|
|
||||||
create_config_dir(args.config_dir, args.refresh)
|
|
||||||
logger.info(f'Initialized config dir at {args.config_dir}')
|
|
||||||
return 0
|
|
||||||
|
|
||||||
|
|
||||||
@alias('gs')
|
|
||||||
@no_argument
|
|
||||||
def command_generate_secret(args):
|
|
||||||
"""
|
|
||||||
Generate a Flask secret key
|
|
||||||
|
|
||||||
The Flask server will not run unless a secret key has
|
|
||||||
been generated.
|
|
||||||
"""
|
|
||||||
root: RootConfigDirectoryContext = args.root
|
|
||||||
secret_key: bytes = os.urandom(32)
|
|
||||||
with root.edit_config() as cfg:
|
|
||||||
cfg.secret_key = secret_key.hex()
|
|
||||||
logger.info("Regenerated Flask secret key")
|
|
||||||
return 0
|
|
||||||
|
|
||||||
|
|
||||||
@alias('r')
|
|
||||||
@add_argument("-a", "--address", default="127.0.0.1")
|
|
||||||
@add_argument("-p", "--port", default="5000")
|
|
||||||
@add_argument("--debug", action="store_true")
|
|
||||||
def command_run(args):
|
|
||||||
"""
|
|
||||||
Run the default Flask server
|
|
||||||
|
|
||||||
The default Flask server is not secure, and should
|
|
||||||
only be used for development.
|
|
||||||
"""
|
|
||||||
from amanuensis.server import get_app
|
|
||||||
|
|
||||||
root: RootConfigDirectoryContext = args.root
|
|
||||||
|
|
||||||
with root.read_config() as cfg:
|
|
||||||
if cfg.secret_key is None:
|
|
||||||
logger.error("Can't run server without a secret_key. "
|
|
||||||
"Run generate-secet first.")
|
|
||||||
return -1
|
|
||||||
|
|
||||||
get_app(root).run(host=args.address, port=args.port, debug=args.debug)
|
|
||||||
return 0
|
|
||||||
|
|
||||||
|
|
||||||
@alias('n')
|
|
||||||
@add_argument("--get",
|
|
||||||
metavar="PATHSPEC",
|
|
||||||
dest="get",
|
|
||||||
nargs="?",
|
|
||||||
const=CONFIG_GET_ROOT_VALUE,
|
|
||||||
help="Get the value of a config key")
|
|
||||||
@add_argument("--set",
|
|
||||||
metavar=("PATHSPEC", "VALUE"),
|
|
||||||
dest="set",
|
|
||||||
nargs=2,
|
|
||||||
help="Set the value of a config key")
|
|
||||||
def command_config(args):
|
|
||||||
"""
|
|
||||||
Interact with the global config
|
|
||||||
|
|
||||||
PATHSPEC is a path into the config object formatted as
|
|
||||||
a dot-separated sequence of keys.
|
|
||||||
"""
|
|
||||||
root: RootConfigDirectoryContext = args.root
|
|
||||||
|
|
||||||
if args.get and args.set:
|
|
||||||
logger.error("Specify one of --get and --set")
|
|
||||||
return -1
|
|
||||||
|
|
||||||
if args.get:
|
|
||||||
with root.read_config() as cfg:
|
|
||||||
config_get(cfg, args.get)
|
|
||||||
|
|
||||||
if args.set:
|
|
||||||
with root.edit_config() as cfg:
|
|
||||||
config_set("config", cfg, args.set)
|
|
||||||
|
|
||||||
return 0
|
|
|
@ -1,5 +0,0 @@
|
||||||
from .setup import init_logging
|
|
||||||
|
|
||||||
__all__ = [member.__name__ for member in [
|
|
||||||
init_logging
|
|
||||||
]]
|
|
|
@ -1,45 +0,0 @@
|
||||||
import logging
|
|
||||||
import logging.handlers
|
|
||||||
|
|
||||||
|
|
||||||
basic_formatter = logging.Formatter(
|
|
||||||
fmt='[{levelname}] {message}',
|
|
||||||
style='{')
|
|
||||||
detailed_formatter = logging.Formatter(
|
|
||||||
fmt='[{asctime}] [{levelname}:{filename}:{lineno}] {message}',
|
|
||||||
style='{')
|
|
||||||
basic_console_handler = logging.StreamHandler()
|
|
||||||
basic_console_handler.setLevel(logging.INFO)
|
|
||||||
basic_console_handler.setFormatter(basic_formatter)
|
|
||||||
detailed_console_handler = logging.StreamHandler()
|
|
||||||
detailed_console_handler.setLevel(logging.DEBUG)
|
|
||||||
detailed_console_handler.setFormatter(detailed_formatter)
|
|
||||||
|
|
||||||
|
|
||||||
def get_file_handler(filename: str) -> logging.Handler:
|
|
||||||
handler = logging.handlers.RotatingFileHandler(
|
|
||||||
filename=filename,
|
|
||||||
maxBytes=1000000,
|
|
||||||
backupCount=10,
|
|
||||||
delay=True,
|
|
||||||
encoding='utf8',
|
|
||||||
)
|
|
||||||
handler.setLevel(logging.DEBUG)
|
|
||||||
handler.setFormatter(detailed_formatter)
|
|
||||||
return handler
|
|
||||||
|
|
||||||
|
|
||||||
def init_logging(verbose: bool, log_filename: str):
|
|
||||||
"""
|
|
||||||
Initializes the Amanuensis logger settings
|
|
||||||
"""
|
|
||||||
logger = logging.getLogger("amanuensis")
|
|
||||||
if log_filename:
|
|
||||||
logger.addHandler(get_file_handler(log_filename))
|
|
||||||
logger.setLevel(logging.DEBUG)
|
|
||||||
elif verbose:
|
|
||||||
logger.addHandler(detailed_console_handler)
|
|
||||||
logger.setLevel(logging.DEBUG)
|
|
||||||
else:
|
|
||||||
logger.addHandler(basic_console_handler)
|
|
||||||
logger.setLevel(logging.INFO)
|
|
|
@ -17,6 +17,7 @@ black = "^21.5b2"
|
||||||
mypy = "^0.812"
|
mypy = "^0.812"
|
||||||
|
|
||||||
[tool.poetry.scripts]
|
[tool.poetry.scripts]
|
||||||
|
amanuensis-cli = "amanuensis.cli:main"
|
||||||
amanuensis-server = "amanuensis.server:run"
|
amanuensis-server = "amanuensis.server:run"
|
||||||
|
|
||||||
[tool.black]
|
[tool.black]
|
||||||
|
|
Loading…
Reference in New Issue