Compare commits
5 Commits
d29a656c98
...
e23019bff6
Author | SHA1 | Date |
---|---|---|
Tim Van Baak | e23019bff6 | |
Tim Van Baak | dd144bf207 | |
Tim Van Baak | 3ab6e2c48d | |
Tim Van Baak | 7e3db84b3c | |
Tim Van Baak | bea38d67df |
|
@ -0,0 +1,45 @@
|
||||||
|
from argparse import ArgumentParser
|
||||||
|
from typing import Optional
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
class AmanuensisConfig:
|
||||||
|
"""Base config type. Defines config keys for subclasses to override."""
|
||||||
|
|
||||||
|
# If CONFIG_FILE is defined, the config file it points to may override
|
||||||
|
# config values defined on the config object itself.
|
||||||
|
CONFIG_FILE: Optional[str] = None
|
||||||
|
STATIC_ROOT: Optional[str] = "static"
|
||||||
|
SECRET_KEY: Optional[str] = "secret"
|
||||||
|
DATABASE_URI: Optional[str] = "sqlite:///:memory:"
|
||||||
|
TESTING: bool = False
|
||||||
|
|
||||||
|
|
||||||
|
class EnvironmentConfig(AmanuensisConfig):
|
||||||
|
"""Loads config values from environment variables."""
|
||||||
|
|
||||||
|
CONFIG_FILE = os.environ.get("AMANUENSIS_CONFIG_FILE", AmanuensisConfig.CONFIG_FILE)
|
||||||
|
STATIC_ROOT = os.environ.get("AMANUENSIS_STATIC_ROOT", AmanuensisConfig.STATIC_ROOT)
|
||||||
|
SECRET_KEY = os.environ.get("AMANUENSIS_SECRET_KEY", AmanuensisConfig.SECRET_KEY)
|
||||||
|
DATABASE_URI = os.environ.get(
|
||||||
|
"AMANUENSIS_DATABASE_URI", AmanuensisConfig.DATABASE_URI
|
||||||
|
)
|
||||||
|
TESTING = os.environ.get("AMANUENSIS_TESTING", "").lower() in ("true", "1")
|
||||||
|
|
||||||
|
|
||||||
|
class CommandLineConfig(AmanuensisConfig):
|
||||||
|
"""Loads config values from command line arguments."""
|
||||||
|
def __init__(self) -> None:
|
||||||
|
parser = ArgumentParser()
|
||||||
|
parser.add_argument("--config-file", default=AmanuensisConfig.CONFIG_FILE)
|
||||||
|
parser.add_argument("--static-root", default=AmanuensisConfig.STATIC_ROOT)
|
||||||
|
parser.add_argument("--secret-key", default=AmanuensisConfig.SECRET_KEY)
|
||||||
|
parser.add_argument("--database-uri", default=AmanuensisConfig.DATABASE_URI)
|
||||||
|
parser.add_argument("--debug", action="store_true")
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
self.CONFIG_FILE = args.config_file
|
||||||
|
self.STATIC_ROOT = args.static_root
|
||||||
|
self.SECRET_KEY = args.secret_key
|
||||||
|
self.DATABASE_URI = args.database_uri
|
||||||
|
self.TESTING = args.debug
|
|
@ -1,23 +0,0 @@
|
||||||
# Module imports
|
|
||||||
from .dict import AttrOrderedDict, ReadOnlyOrderedDict
|
|
||||||
from .directory import (
|
|
||||||
RootConfigDirectoryContext,
|
|
||||||
UserConfigDirectoryContext,
|
|
||||||
LexiconConfigDirectoryContext,
|
|
||||||
is_guid)
|
|
||||||
|
|
||||||
# Environment variable name constants
|
|
||||||
ENV_SECRET_KEY = "AMANUENSIS_SECRET_KEY"
|
|
||||||
ENV_CONFIG_DIR = "AMANUENSIS_CONFIG_DIR"
|
|
||||||
ENV_LOG_FILE = "AMANUENSIS_LOG_FILE"
|
|
||||||
ENV_LOG_FILE_SIZE = "AMANUENSIS_LOG_FILE_SIZE"
|
|
||||||
ENV_LOG_FILE_NUM = "AMANUENSIS_LOG_FILE_NUM"
|
|
||||||
|
|
||||||
__all__ = [
|
|
||||||
AttrOrderedDict.__name__,
|
|
||||||
ReadOnlyOrderedDict.__name__,
|
|
||||||
RootConfigDirectoryContext.__name__,
|
|
||||||
UserConfigDirectoryContext.__name__,
|
|
||||||
LexiconConfigDirectoryContext.__name__,
|
|
||||||
is_guid.__name__,
|
|
||||||
]
|
|
|
@ -1,82 +0,0 @@
|
||||||
"""
|
|
||||||
`with` context managers for mediating config file access.
|
|
||||||
"""
|
|
||||||
# Standard library imports
|
|
||||||
import fcntl
|
|
||||||
import json
|
|
||||||
|
|
||||||
# Application imports
|
|
||||||
from .dict import AttrOrderedDict, ReadOnlyOrderedDict
|
|
||||||
|
|
||||||
|
|
||||||
class open_lock():
|
|
||||||
"""A context manager that opens a file with the specified file lock"""
|
|
||||||
def __init__(self, path, mode, lock_type):
|
|
||||||
self.fd = open(path, mode, encoding='utf8')
|
|
||||||
fcntl.lockf(self.fd, lock_type)
|
|
||||||
|
|
||||||
def __enter__(self):
|
|
||||||
return self.fd
|
|
||||||
|
|
||||||
def __exit__(self, exc_type, exc_value, traceback):
|
|
||||||
fcntl.lockf(self.fd, fcntl.LOCK_UN)
|
|
||||||
self.fd.close()
|
|
||||||
|
|
||||||
|
|
||||||
class open_sh(open_lock):
|
|
||||||
"""A context manager that opens a file with a shared lock"""
|
|
||||||
def __init__(self, path, mode):
|
|
||||||
super().__init__(path, mode, fcntl.LOCK_SH)
|
|
||||||
|
|
||||||
|
|
||||||
class open_ex(open_lock):
|
|
||||||
"""A context manager that opens a file with an exclusive lock"""
|
|
||||||
def __init__(self, path, mode):
|
|
||||||
super().__init__(path, mode, fcntl.LOCK_EX)
|
|
||||||
|
|
||||||
|
|
||||||
class json_ro(open_sh):
|
|
||||||
"""
|
|
||||||
A context manager that opens a file in a shared, read-only mode.
|
|
||||||
The contents of the file are read as JSON and returned as a read-
|
|
||||||
only OrderedDict.
|
|
||||||
"""
|
|
||||||
def __init__(self, path):
|
|
||||||
super().__init__(path, 'r')
|
|
||||||
self.config = None
|
|
||||||
|
|
||||||
def __enter__(self) -> ReadOnlyOrderedDict:
|
|
||||||
self.config = json.load(self.fd, object_pairs_hook=ReadOnlyOrderedDict)
|
|
||||||
return self.config
|
|
||||||
|
|
||||||
|
|
||||||
class json_rw(open_ex):
|
|
||||||
"""
|
|
||||||
A context manager that opens a file with an exclusive lock. The
|
|
||||||
file mode defaults to r+, which requires that the file exist. The
|
|
||||||
file mode can be set to w+ to create a new file by setting the new
|
|
||||||
kwarg in the ctor. The contents of the file are read as JSON and
|
|
||||||
returned in an AttrOrderedDict. Any changes to the context dict
|
|
||||||
will be written out to the file when the context manager exits,
|
|
||||||
unless an exception is raised before exiting.
|
|
||||||
"""
|
|
||||||
def __init__(self, path, new=False):
|
|
||||||
mode = 'w+' if new else 'r+'
|
|
||||||
super().__init__(path, mode)
|
|
||||||
self.config = None
|
|
||||||
self.new = new
|
|
||||||
|
|
||||||
def __enter__(self) -> AttrOrderedDict:
|
|
||||||
if not self.new:
|
|
||||||
self.config = json.load(self.fd, object_pairs_hook=AttrOrderedDict)
|
|
||||||
else:
|
|
||||||
self.config = AttrOrderedDict()
|
|
||||||
return self.config
|
|
||||||
|
|
||||||
def __exit__(self, exc_type, exc_value, traceback):
|
|
||||||
# Only write the new value out if there wasn't an exception
|
|
||||||
if not exc_type:
|
|
||||||
self.fd.seek(0)
|
|
||||||
json.dump(self.config, self.fd, allow_nan=False, indent='\t')
|
|
||||||
self.fd.truncate()
|
|
||||||
super().__exit__(exc_type, exc_value, traceback)
|
|
|
@ -1,52 +0,0 @@
|
||||||
"""
|
|
||||||
Dictionary classes used to represent JSON config files in memory.
|
|
||||||
"""
|
|
||||||
from collections import OrderedDict
|
|
||||||
|
|
||||||
from amanuensis.errors import ReadOnlyError
|
|
||||||
|
|
||||||
|
|
||||||
class AttrOrderedDict(OrderedDict):
|
|
||||||
"""
|
|
||||||
An OrderedDict with attribute access to known keys and explicit
|
|
||||||
creation of new keys.
|
|
||||||
"""
|
|
||||||
def __getattr__(self, key):
|
|
||||||
if key not in self:
|
|
||||||
raise AttributeError(key)
|
|
||||||
return self[key]
|
|
||||||
|
|
||||||
def __setattr__(self, key, value):
|
|
||||||
if key not in self:
|
|
||||||
raise AttributeError(key)
|
|
||||||
self[key] = value
|
|
||||||
|
|
||||||
def new(self, key, value):
|
|
||||||
"""Setter for adding new keys"""
|
|
||||||
if key in self:
|
|
||||||
raise KeyError("Key already exists: '{}'".format(key))
|
|
||||||
self[key] = value
|
|
||||||
|
|
||||||
|
|
||||||
class ReadOnlyOrderedDict(OrderedDict):
|
|
||||||
"""
|
|
||||||
An OrderedDict that cannot be modified with attribute access to
|
|
||||||
known keys.
|
|
||||||
"""
|
|
||||||
def __readonly__(self, *args, **kwargs):
|
|
||||||
raise ReadOnlyError("Cannot modify a ReadOnlyOrderedDict")
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
super(ReadOnlyOrderedDict, self).__init__(*args, **kwargs)
|
|
||||||
self.__setitem__ = self.__readonly__
|
|
||||||
self.__delitem__ = self.__readonly__
|
|
||||||
self.pop = self.__readonly__
|
|
||||||
self.popitem = self.__readonly__
|
|
||||||
self.clear = self.__readonly__
|
|
||||||
self.update = self.__readonly__
|
|
||||||
self.setdefault = self.__readonly__
|
|
||||||
|
|
||||||
def __getattr__(self, key):
|
|
||||||
if key not in self:
|
|
||||||
raise AttributeError(key)
|
|
||||||
return self[key]
|
|
|
@ -1,160 +0,0 @@
|
||||||
"""
|
|
||||||
Config directory abstractions that encapsulate path munging and context
|
|
||||||
manager usage.
|
|
||||||
"""
|
|
||||||
import os
|
|
||||||
import re
|
|
||||||
from typing import Iterable
|
|
||||||
|
|
||||||
from amanuensis.errors import MissingConfigError, ConfigAlreadyExistsError
|
|
||||||
|
|
||||||
from .context import json_ro, json_rw
|
|
||||||
|
|
||||||
|
|
||||||
def is_guid(s: str) -> bool:
|
|
||||||
return bool(re.match(r'[0-9a-z]{32}', s.lower()))
|
|
||||||
|
|
||||||
|
|
||||||
class ConfigDirectoryContext():
|
|
||||||
"""
|
|
||||||
Base class for CRUD operations on config files in a config
|
|
||||||
directory.
|
|
||||||
"""
|
|
||||||
def __init__(self, path: str):
|
|
||||||
self.path: str = path
|
|
||||||
if not os.path.isdir(self.path):
|
|
||||||
raise MissingConfigError(path)
|
|
||||||
|
|
||||||
def new(self, filename) -> json_rw:
|
|
||||||
"""
|
|
||||||
Creates a JSON file that doesn't already exist.
|
|
||||||
"""
|
|
||||||
if not filename.endswith('.json'):
|
|
||||||
filename = f'{filename}.json'
|
|
||||||
fpath: str = os.path.join(self.path, filename)
|
|
||||||
if os.path.isfile(fpath):
|
|
||||||
raise ConfigAlreadyExistsError(fpath)
|
|
||||||
return json_rw(fpath, new=True)
|
|
||||||
|
|
||||||
def read(self, filename) -> json_ro:
|
|
||||||
"""
|
|
||||||
Loads a JSON file in read-only mode.
|
|
||||||
"""
|
|
||||||
if not filename.endswith('.json'):
|
|
||||||
filename = f'{filename}.json'
|
|
||||||
fpath: str = os.path.join(self.path, filename)
|
|
||||||
if not os.path.isfile(fpath):
|
|
||||||
raise MissingConfigError(fpath)
|
|
||||||
return json_ro(fpath)
|
|
||||||
|
|
||||||
def edit(self, filename, create=False) -> json_rw:
|
|
||||||
"""
|
|
||||||
Loads a JSON file in write mode.
|
|
||||||
"""
|
|
||||||
if not filename.endswith('.json'):
|
|
||||||
filename = f'{filename}.json'
|
|
||||||
fpath: str = os.path.join(self.path, filename)
|
|
||||||
if not create and not os.path.isfile(fpath):
|
|
||||||
raise MissingConfigError(fpath)
|
|
||||||
return json_rw(fpath, new=create)
|
|
||||||
|
|
||||||
def delete(self, filename) -> None:
|
|
||||||
"""Deletes a file."""
|
|
||||||
if not filename.endswith('.json'):
|
|
||||||
filename = f'{filename}.json'
|
|
||||||
fpath: str = os.path.join(self.path, filename)
|
|
||||||
if not os.path.isfile(fpath):
|
|
||||||
raise MissingConfigError(fpath)
|
|
||||||
os.remove(fpath)
|
|
||||||
|
|
||||||
def ls(self) -> Iterable[str]:
|
|
||||||
"""Lists all files in this directory."""
|
|
||||||
filenames: Iterable[str] = os.listdir(self.path)
|
|
||||||
return filenames
|
|
||||||
|
|
||||||
|
|
||||||
class ConfigFileConfigDirectoryContext(ConfigDirectoryContext):
|
|
||||||
"""
|
|
||||||
Config directory with a `config.json`.
|
|
||||||
"""
|
|
||||||
def __init__(self, path: str):
|
|
||||||
super().__init__(path)
|
|
||||||
config_path = os.path.join(self.path, 'config.json')
|
|
||||||
if not os.path.isfile(config_path):
|
|
||||||
raise MissingConfigError(config_path)
|
|
||||||
|
|
||||||
def edit_config(self) -> json_rw:
|
|
||||||
"""rw context manager for this object's config file."""
|
|
||||||
return self.edit('config')
|
|
||||||
|
|
||||||
def read_config(self) -> json_ro:
|
|
||||||
"""ro context manager for this object's config file."""
|
|
||||||
return self.read('config')
|
|
||||||
|
|
||||||
|
|
||||||
class IndexDirectoryContext(ConfigDirectoryContext):
|
|
||||||
"""
|
|
||||||
A lookup layer for getting config directory contexts for lexicon
|
|
||||||
or user directories.
|
|
||||||
"""
|
|
||||||
def __init__(self, path: str, cdc_type: type):
|
|
||||||
super().__init__(path)
|
|
||||||
index_path = os.path.join(self.path, 'index.json')
|
|
||||||
if not os.path.isfile(index_path):
|
|
||||||
raise MissingConfigError(index_path)
|
|
||||||
self.cdc_type = cdc_type
|
|
||||||
|
|
||||||
def __getitem__(self, key: str) -> ConfigFileConfigDirectoryContext:
|
|
||||||
"""
|
|
||||||
Returns a context to the given item. key is treated as the
|
|
||||||
item's id if it's a guid string, otherwise it's treated as
|
|
||||||
the item's indexed name and run through the index first.
|
|
||||||
"""
|
|
||||||
if not is_guid(key):
|
|
||||||
with self.read_index() as index:
|
|
||||||
iid = index.get(key)
|
|
||||||
if not iid:
|
|
||||||
raise MissingConfigError(key)
|
|
||||||
key = iid
|
|
||||||
return self.cdc_type(os.path.join(self.path, key))
|
|
||||||
|
|
||||||
def edit_index(self) -> json_rw:
|
|
||||||
return self.edit('index')
|
|
||||||
|
|
||||||
def read_index(self) -> json_ro:
|
|
||||||
return self.read('index')
|
|
||||||
|
|
||||||
|
|
||||||
class RootConfigDirectoryContext(ConfigFileConfigDirectoryContext):
|
|
||||||
"""
|
|
||||||
Context for the config directory with links to the lexicon and
|
|
||||||
user contexts.
|
|
||||||
"""
|
|
||||||
def __init__(self, path):
|
|
||||||
super().__init__(path)
|
|
||||||
self.lexicon: IndexDirectoryContext = IndexDirectoryContext(
|
|
||||||
os.path.join(self.path, 'lexicon'),
|
|
||||||
LexiconConfigDirectoryContext)
|
|
||||||
self.user: IndexDirectoryContext = IndexDirectoryContext(
|
|
||||||
os.path.join(self.path, 'user'),
|
|
||||||
UserConfigDirectoryContext)
|
|
||||||
|
|
||||||
|
|
||||||
class LexiconConfigDirectoryContext(ConfigFileConfigDirectoryContext):
|
|
||||||
"""
|
|
||||||
A config context for a lexicon's config directory.
|
|
||||||
"""
|
|
||||||
def __init__(self, path):
|
|
||||||
super().__init__(path)
|
|
||||||
self.draft: ConfigDirectoryContext = ConfigDirectoryContext(
|
|
||||||
os.path.join(self.path, 'draft'))
|
|
||||||
self.src: ConfigDirectoryContext = ConfigDirectoryContext(
|
|
||||||
os.path.join(self.path, 'src'))
|
|
||||||
self.article: ConfigDirectoryContext = ConfigDirectoryContext(
|
|
||||||
os.path.join(self.path, 'article'))
|
|
||||||
|
|
||||||
|
|
||||||
class UserConfigDirectoryContext(ConfigFileConfigDirectoryContext):
|
|
||||||
"""
|
|
||||||
A config context for a user's config directory.
|
|
||||||
"""
|
|
|
@ -1,96 +0,0 @@
|
||||||
# Standard library imports
|
|
||||||
from collections import OrderedDict
|
|
||||||
import fcntl
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
import shutil
|
|
||||||
|
|
||||||
# Module imports
|
|
||||||
from amanuensis.resources import get_stream
|
|
||||||
|
|
||||||
from .context import json_ro, json_rw
|
|
||||||
|
|
||||||
|
|
||||||
def create_config_dir(config_dir, refresh=False):
|
|
||||||
"""
|
|
||||||
Create or refresh a config directory
|
|
||||||
"""
|
|
||||||
|
|
||||||
def prepend(*path):
|
|
||||||
joined = os.path.join(*path)
|
|
||||||
if not joined.startswith(config_dir):
|
|
||||||
joined = os.path.join(config_dir, joined)
|
|
||||||
return joined
|
|
||||||
|
|
||||||
# Create the directory if it doesn't exist.
|
|
||||||
if not os.path.isdir(config_dir):
|
|
||||||
os.mkdir(config_dir)
|
|
||||||
|
|
||||||
# The directory should be empty if we're not updating an existing one.
|
|
||||||
if len(os.listdir(config_dir)) > 0 and not refresh:
|
|
||||||
print("Directory {} is not empty".format(config_dir))
|
|
||||||
return -1
|
|
||||||
|
|
||||||
# Update or create global config.
|
|
||||||
def_cfg = get_stream("global.json")
|
|
||||||
global_config_path = prepend("config.json")
|
|
||||||
if refresh and os.path.isfile(global_config_path):
|
|
||||||
# We need to write an entirely different ordereddict to the config
|
|
||||||
# file, so we mimic the config.context functionality manually.
|
|
||||||
with open(global_config_path, 'r+', encoding='utf8') as cfg_file:
|
|
||||||
fcntl.lockf(cfg_file, fcntl.LOCK_EX)
|
|
||||||
old_cfg = json.load(cfg_file, object_pairs_hook=OrderedDict)
|
|
||||||
new_cfg = json.load(def_cfg, object_pairs_hook=OrderedDict)
|
|
||||||
merged = {}
|
|
||||||
for key in new_cfg:
|
|
||||||
merged[key] = old_cfg[key] if key in old_cfg else new_cfg[key]
|
|
||||||
if key not in old_cfg:
|
|
||||||
print("Added key '{}' to config".format(key))
|
|
||||||
for key in old_cfg:
|
|
||||||
if key not in new_cfg:
|
|
||||||
print("Config contains unknown key '{}'".format(key))
|
|
||||||
merged[key] = old_cfg[key]
|
|
||||||
cfg_file.seek(0)
|
|
||||||
json.dump(merged, cfg_file, allow_nan=False, indent='\t')
|
|
||||||
cfg_file.truncate()
|
|
||||||
fcntl.lockf(cfg_file, fcntl.LOCK_UN)
|
|
||||||
else:
|
|
||||||
with open(prepend("config.json"), 'wb') as f:
|
|
||||||
f.write(def_cfg.read())
|
|
||||||
|
|
||||||
# Ensure lexicon subdir exists.
|
|
||||||
if not os.path.isdir(prepend("lexicon")):
|
|
||||||
os.mkdir(prepend("lexicon"))
|
|
||||||
if not os.path.isfile(prepend("lexicon", "index.json")):
|
|
||||||
with open(prepend("lexicon", "index.json"), 'w') as f:
|
|
||||||
json.dump({}, f)
|
|
||||||
|
|
||||||
# Ensure user subdir exists.
|
|
||||||
if not os.path.isdir(prepend("user")):
|
|
||||||
os.mkdir(prepend("user"))
|
|
||||||
if not os.path.isfile(prepend('user', 'index.json')):
|
|
||||||
with open(prepend('user', 'index.json'), 'w') as f:
|
|
||||||
json.dump({}, f)
|
|
||||||
|
|
||||||
if refresh:
|
|
||||||
for dir_name in ('lexicon', 'user'):
|
|
||||||
# Clean up unindexed folders
|
|
||||||
with json_ro(prepend(dir_name, 'index.json')) as index:
|
|
||||||
known = list(index.values())
|
|
||||||
entries = os.listdir(prepend(dir_name))
|
|
||||||
for dir_entry in entries:
|
|
||||||
if dir_entry == "index.json":
|
|
||||||
continue
|
|
||||||
if dir_entry in known:
|
|
||||||
continue
|
|
||||||
print("Removing unindexed folder: '{}/{}'"
|
|
||||||
.format(dir_name, dir_entry))
|
|
||||||
shutil.rmtree(prepend(dir_name, dir_entry))
|
|
||||||
|
|
||||||
# Remove orphaned index listings
|
|
||||||
with json_rw(prepend(dir_name, 'index.json')) as index:
|
|
||||||
for name, entry in index.items():
|
|
||||||
if not os.path.isdir(prepend(dir_name, entry)):
|
|
||||||
print("Removing stale {} index entry '{}: {}'"
|
|
||||||
.format(dir_name, name, entry))
|
|
||||||
del index[name]
|
|
|
@ -3,8 +3,12 @@ Database connection setup
|
||||||
"""
|
"""
|
||||||
from sqlalchemy import create_engine, MetaData, event
|
from sqlalchemy import create_engine, MetaData, event
|
||||||
from sqlalchemy.ext.declarative import declarative_base
|
from sqlalchemy.ext.declarative import declarative_base
|
||||||
from sqlalchemy.orm import scoped_session
|
from sqlalchemy.orm import scoped_session, sessionmaker
|
||||||
from sqlalchemy.orm import sessionmaker
|
|
||||||
|
try:
|
||||||
|
from greenlet import getcurrent as get_ident
|
||||||
|
except ImportError:
|
||||||
|
from threading import get_ident
|
||||||
|
|
||||||
|
|
||||||
# Define naming conventions for generated constraints
|
# Define naming conventions for generated constraints
|
||||||
|
@ -34,7 +38,9 @@ class DbContext:
|
||||||
cursor.close()
|
cursor.close()
|
||||||
|
|
||||||
# Create a thread-safe session factory
|
# Create a thread-safe session factory
|
||||||
self.session = scoped_session(sessionmaker(bind=self.engine))
|
self.session = scoped_session(
|
||||||
|
sessionmaker(bind=self.engine), scopefunc=get_ident
|
||||||
|
)
|
||||||
|
|
||||||
def __call__(self, *args, **kwargs):
|
def __call__(self, *args, **kwargs):
|
||||||
"""Provides shortcut access to session.execute."""
|
"""Provides shortcut access to session.execute."""
|
||||||
|
|
|
@ -1,46 +1,62 @@
|
||||||
import os
|
import json
|
||||||
|
|
||||||
from flask import Flask
|
from flask import Flask, g
|
||||||
|
|
||||||
from amanuensis.config import RootConfigDirectoryContext, ENV_CONFIG_DIR
|
from amanuensis.config import AmanuensisConfig, CommandLineConfig
|
||||||
from amanuensis.models import ModelFactory
|
from amanuensis.db import DbContext
|
||||||
from .auth import get_login_manager, bp_auth
|
|
||||||
from .helpers import register_custom_filters
|
|
||||||
from .home import bp_home
|
|
||||||
from .lexicon import bp_lexicon
|
|
||||||
from .session import bp_session
|
|
||||||
|
|
||||||
|
|
||||||
def get_app(root: RootConfigDirectoryContext) -> Flask:
|
def get_app(
|
||||||
# Flask app init
|
config: AmanuensisConfig,
|
||||||
with root.read_config() as cfg:
|
db: DbContext = None,
|
||||||
app = Flask(
|
) -> Flask:
|
||||||
__name__,
|
"""Application factory"""
|
||||||
template_folder='.',
|
# Create the Flask object
|
||||||
static_folder=cfg.static_root
|
app = Flask(__name__, template_folder=".", static_folder=config.STATIC_ROOT)
|
||||||
)
|
|
||||||
app.secret_key = bytes.fromhex(cfg.secret_key)
|
|
||||||
app.config['root'] = root
|
|
||||||
app.config['model_factory'] = ModelFactory(root)
|
|
||||||
app.jinja_options['trim_blocks'] = True
|
|
||||||
app.jinja_options['lstrip_blocks'] = True
|
|
||||||
register_custom_filters(app)
|
|
||||||
|
|
||||||
# Flask-Login init
|
# Load keys from the config object
|
||||||
login_manager = get_login_manager(root)
|
app.config.from_object(config)
|
||||||
login_manager.init_app(app)
|
|
||||||
|
|
||||||
# Blueprint inits
|
# If a config file is now specified, also load keys from there
|
||||||
app.register_blueprint(bp_auth)
|
if app.config.get("CONFIG_FILE", None):
|
||||||
app.register_blueprint(bp_home)
|
app.config.from_file(app.config["CONFIG_FILE"], json.load)
|
||||||
app.register_blueprint(bp_lexicon)
|
|
||||||
app.register_blueprint(bp_session)
|
|
||||||
|
|
||||||
return app
|
# Assert that all required config values are now set
|
||||||
|
for config_key in ("SECRET_KEY", "DATABASE_URI"):
|
||||||
|
if not app.config.get(config_key):
|
||||||
|
raise Exception(f"{config_key} must be defined")
|
||||||
|
|
||||||
|
# Create the database context, if one wasn't already given
|
||||||
|
if db is None:
|
||||||
|
db = DbContext(app.config["DATABASE_URI"])
|
||||||
|
|
||||||
|
# Make the database connection available to requests via g
|
||||||
|
def db_setup():
|
||||||
|
g.db = db
|
||||||
|
app.before_request(db_setup)
|
||||||
|
|
||||||
|
# Tear down the session on request teardown
|
||||||
|
def db_teardown(response_or_exc):
|
||||||
|
db.session.remove()
|
||||||
|
app.teardown_appcontext(db_teardown)
|
||||||
|
|
||||||
|
# Configure jinja options
|
||||||
|
app.jinja_options.update(trim_blocks=True, lstrip_blocks=True)
|
||||||
|
|
||||||
|
# Set up Flask-Login
|
||||||
|
# TODO
|
||||||
|
|
||||||
|
# Register blueprints
|
||||||
|
# TODO
|
||||||
|
|
||||||
|
def test():
|
||||||
|
return "Hello, world!"
|
||||||
|
app.route("/")(test)
|
||||||
|
|
||||||
|
return app
|
||||||
|
|
||||||
|
|
||||||
def default():
|
def run():
|
||||||
cwd = os.getcwd()
|
"""Run the server, populating the config from the command line."""
|
||||||
config_dir = os.environ.get(ENV_CONFIG_DIR, "amanuensis")
|
config = CommandLineConfig()
|
||||||
root = RootConfigDirectoryContext(os.path.join(cwd, config_dir))
|
get_app(config).run(debug=config.TESTING)
|
||||||
return get_app(root)
|
|
||||||
|
|
|
@ -16,6 +16,9 @@ pytest = "^5.2"
|
||||||
black = "^21.5b2"
|
black = "^21.5b2"
|
||||||
mypy = "^0.812"
|
mypy = "^0.812"
|
||||||
|
|
||||||
|
[tool.poetry.scripts]
|
||||||
|
amanuensis-server = "amanuensis.server:run"
|
||||||
|
|
||||||
[tool.black]
|
[tool.black]
|
||||||
extend-exclude = "^/amanuensis/cli/.*|^/amanuensis/config/.*|^/amanuensis/lexicon/.*|^/amanuensis/log/.*|^/amanuensis/models/.*|^/amanuensis/resources/.*|^/amanuensis/server/.*|^/amanuensis/user/.*|^/amanuensis/__main__.py"
|
extend-exclude = "^/amanuensis/cli/.*|^/amanuensis/config/.*|^/amanuensis/lexicon/.*|^/amanuensis/log/.*|^/amanuensis/models/.*|^/amanuensis/resources/.*|^/amanuensis/server/.*|^/amanuensis/user/.*|^/amanuensis/__main__.py"
|
||||||
|
|
||||||
|
|
|
@ -8,6 +8,8 @@ import amanuensis.backend.character as charq
|
||||||
import amanuensis.backend.lexicon as lexiq
|
import amanuensis.backend.lexicon as lexiq
|
||||||
import amanuensis.backend.membership as memq
|
import amanuensis.backend.membership as memq
|
||||||
import amanuensis.backend.user as userq
|
import amanuensis.backend.user as userq
|
||||||
|
from amanuensis.config import AmanuensisConfig
|
||||||
|
from amanuensis.server import get_app
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
|
@ -122,3 +124,16 @@ def lexicon_with_editor(make):
|
||||||
)
|
)
|
||||||
assert membership
|
assert membership
|
||||||
return (lexicon, editor)
|
return (lexicon, editor)
|
||||||
|
|
||||||
|
|
||||||
|
class TestConfig(AmanuensisConfig):
|
||||||
|
TESTING = True
|
||||||
|
SECRET_KEY = "secret key"
|
||||||
|
DATABASE_URI = "sqlite:///:memory:"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def app(db):
|
||||||
|
"""Provides an application running on top of the test database."""
|
||||||
|
server_app = get_app(TestConfig, db)
|
||||||
|
return server_app
|
||||||
|
|
|
@ -0,0 +1,13 @@
|
||||||
|
from flask import Flask
|
||||||
|
|
||||||
|
|
||||||
|
def test_app_testing(app: Flask):
|
||||||
|
"""Confirm that the test config loads correctly."""
|
||||||
|
assert app.testing
|
||||||
|
|
||||||
|
|
||||||
|
def test_client(app: Flask):
|
||||||
|
"""Test that the test client works."""
|
||||||
|
with app.test_client() as client:
|
||||||
|
response = client.get("/")
|
||||||
|
assert b"world" in response.data
|
Loading…
Reference in New Issue