Add models module to replace current models later
This commit is contained in:
parent
9e4144eccf
commit
4b33f17169
@ -6,6 +6,7 @@ import os
|
||||
# Module imports
|
||||
from amanuensis.errors import MissingConfigError, MalformedConfigError
|
||||
import amanuensis.config.context
|
||||
from amanuensis.config.context import is_guid
|
||||
import amanuensis.config.init
|
||||
import amanuensis.config.loader
|
||||
|
||||
|
@ -68,7 +68,9 @@ class ConfigDirectoryContext():
|
||||
|
||||
|
||||
class ConfigFileMixin():
|
||||
"""Mixin for objects that have config files."""
|
||||
def config(self, edit=False):
|
||||
"""Context manager for this object's config file."""
|
||||
if edit:
|
||||
return self.edit('config')
|
||||
else:
|
||||
|
@ -1,26 +1,32 @@
|
||||
class AmanuensisError(Exception):
|
||||
"""Base class for exceptions in amanuensis"""
|
||||
|
||||
|
||||
class MissingConfigError(AmanuensisError):
|
||||
"""A config file is missing that was expected to be present"""
|
||||
def __init__(self, path):
|
||||
super().__init__("A config file or directory was expected to "
|
||||
f"exist, but could not be found: {path}")
|
||||
|
||||
|
||||
class ConfigAlreadyExistsError(AmanuensisError):
|
||||
"""Attempted to create a config, but it already exists"""
|
||||
def __init__(self, path):
|
||||
super().__init__("Attempted to create a config, but it already "
|
||||
f"exists: {path}")
|
||||
|
||||
|
||||
class MalformedConfigError(AmanuensisError):
|
||||
"""A config file could not be read and parsed"""
|
||||
|
||||
|
||||
class ReadOnlyError(AmanuensisError):
|
||||
"""A config was edited in readonly mode"""
|
||||
|
||||
|
||||
class ArgumentError(AmanuensisError):
|
||||
"""An internal call was made with invalid arguments"""
|
||||
|
||||
|
||||
class IndexMismatchError(AmanuensisError):
|
||||
"""An id was obtained from an index, but the object doesn't exist"""
|
||||
|
11
amanuensis/models/__init__.py
Normal file
11
amanuensis/models/__init__.py
Normal file
@ -0,0 +1,11 @@
|
||||
from amanuensis.models.factory import ModelFactory
|
||||
from amanuensis.models.lexicon import LexiconModel
|
||||
from amanuensis.models.user import UserModelBase, UserModel, AnonymousUserModel
|
||||
|
||||
__all__ = [
|
||||
'ModelFactory',
|
||||
'LexiconModel',
|
||||
'UserModelBase',
|
||||
'UserModel',
|
||||
'AnonymousUserModel',
|
||||
]
|
46
amanuensis/models/factory.py
Normal file
46
amanuensis/models/factory.py
Normal file
@ -0,0 +1,46 @@
|
||||
from amanuensis.models.user import UserModel
|
||||
from amanuensis.models.lexicon import LexiconModel
|
||||
from amanuensis.config import is_guid
|
||||
from amanuensis.config.context import RootConfigDirectoryContext
|
||||
from amanuensis.errors import ArgumentError
|
||||
|
||||
|
||||
class ModelFactory():
|
||||
def __init__(self, root: RootConfigDirectoryContext):
|
||||
self.root: RootConfigDirectoryContext = root
|
||||
|
||||
def user(self, identifier: str) -> UserModel:
|
||||
"""Get the user model for the given id or username"""
|
||||
# Ensure we have something to work with
|
||||
if identifier is None:
|
||||
raise ArgumentError('identifer must not be None')
|
||||
# Ensure we have a user guid
|
||||
if not is_guid(identifier):
|
||||
with self.root.user.index() as index:
|
||||
uid = index.get(identifier, None)
|
||||
if uid is None:
|
||||
raise KeyError(f'Unknown username: {identifier})')
|
||||
if not is_guid(uid):
|
||||
raise ValueError(f'Invalid index entry: {uid}')
|
||||
else:
|
||||
uid = identifier
|
||||
user = UserModel(self.root, uid)
|
||||
return user
|
||||
|
||||
def lexicon(self, identifier: str) -> LexiconModel:
|
||||
"""Get the lexicon model for the given id or name"""
|
||||
# Ensure we have something to work with
|
||||
if identifier is None:
|
||||
raise ArgumentError('identifier must not be None')
|
||||
# Ensure we have a lexicon guid
|
||||
if not is_guid(identifier):
|
||||
with self.root.lexicon.index() as index:
|
||||
lid = index.get(identifier, None)
|
||||
if lid is None:
|
||||
raise KeyError(f'Unknown lexicon: {identifier}')
|
||||
if not is_guid(lid):
|
||||
raise ValueError(f'Invalid index entry: {lid}')
|
||||
else:
|
||||
lid = identifier
|
||||
lexicon = LexiconModel(self.root, lid)
|
||||
return lexicon
|
61
amanuensis/models/lexicon.py
Normal file
61
amanuensis/models/lexicon.py
Normal file
@ -0,0 +1,61 @@
|
||||
import time
|
||||
|
||||
from amanuensis.config.context import (
|
||||
RootConfigDirectoryContext,
|
||||
LexiconConfigDirectoryContext)
|
||||
from amanuensis.config.loader import ReadOnlyOrderedDict, json_rw
|
||||
|
||||
|
||||
class LexiconModel():
|
||||
"""Represents a lexicon in the Amanuensis config store"""
|
||||
def __init__(self, root: RootConfigDirectoryContext, lid: str):
|
||||
self._lid: str = lid
|
||||
# Creating the config context implicitly checks for existence
|
||||
self._ctx: LexiconConfigDirectoryContext = root.lexicon[lid]
|
||||
with self._ctx.config(edit=False) as config:
|
||||
self._cfg: ReadOnlyOrderedDict = config
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f'<Lexicon {self.cfg.name}>'
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f'<LexiconModel({self.lid})>'
|
||||
|
||||
# Properties
|
||||
|
||||
@property
|
||||
def lid(self) -> str:
|
||||
"""Lexicon guid"""
|
||||
return self._lid
|
||||
|
||||
@property
|
||||
def ctx(self) -> LexiconConfigDirectoryContext:
|
||||
"""Lexicon config directory context"""
|
||||
return self._ctx
|
||||
|
||||
@property
|
||||
def cfg(self) -> ReadOnlyOrderedDict:
|
||||
"""Cached lexicon config"""
|
||||
return self._cfg
|
||||
|
||||
# Utilities
|
||||
|
||||
@property
|
||||
def title(self) -> str:
|
||||
return self.cfg.get('title', f'Lexicon {self.cfg.name}')
|
||||
|
||||
def edit(self) -> json_rw:
|
||||
return self.ctx.config(edit=True)
|
||||
|
||||
def log(self, message: str) -> None:
|
||||
now = int(time.time())
|
||||
with self.edit() as cfg:
|
||||
cfg.log.append([now, message])
|
||||
|
||||
@property
|
||||
def status(self) -> str:
|
||||
if self.cfg.turn.current is None:
|
||||
return "unstarted"
|
||||
if self.cfg.turn.current > self.cfg.turn.max:
|
||||
return "completed"
|
||||
return "ongoing"
|
80
amanuensis/models/user.py
Normal file
80
amanuensis/models/user.py
Normal file
@ -0,0 +1,80 @@
|
||||
from werkzeug.security import generate_password_hash, check_password_hash
|
||||
|
||||
from amanuensis.config.context import (
|
||||
RootConfigDirectoryContext,
|
||||
UserConfigDirectoryContext)
|
||||
from amanuensis.config.loader import ReadOnlyOrderedDict
|
||||
|
||||
|
||||
class UserModelBase():
|
||||
"""Common base class for auth and anon user models"""
|
||||
|
||||
# Properties
|
||||
|
||||
@property
|
||||
def uid(self) -> str:
|
||||
"""User guid"""
|
||||
return getattr(self, '_uid', None)
|
||||
|
||||
@property
|
||||
def ctx(self) -> UserConfigDirectoryContext:
|
||||
"""User config directory context"""
|
||||
return getattr(self, '_ctx', None)
|
||||
|
||||
@property
|
||||
def cfg(self) -> ReadOnlyOrderedDict:
|
||||
"""Cached user config"""
|
||||
return getattr(self, '_cfg', None)
|
||||
|
||||
# Flask-Login interfaces
|
||||
|
||||
@property
|
||||
def is_authenticated(self) -> bool:
|
||||
return self.uid is not None
|
||||
|
||||
@property
|
||||
def is_active(self) -> bool:
|
||||
return self.uid is not None
|
||||
|
||||
@property
|
||||
def is_anonymous(self) -> bool:
|
||||
return self.uid is None
|
||||
|
||||
def get_id(self) -> str:
|
||||
return self.uid
|
||||
|
||||
|
||||
class UserModel(UserModelBase):
|
||||
"""Represents a user in the Amanuensis config store"""
|
||||
def __init__(self, root: RootConfigDirectoryContext, uid: str):
|
||||
self._uid: str = uid
|
||||
# Creating the config context implicitly checks for existence
|
||||
self._ctx: UserConfigDirectoryContext = root.user[uid]
|
||||
with self._ctx.config(edit=False) as config:
|
||||
self._cfg: ReadOnlyOrderedDict = config
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f'<{self.cfg.username}>'
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f'<UserModel({self.uid})>'
|
||||
|
||||
# Utility methods
|
||||
|
||||
def set_password(self, password: str) -> None:
|
||||
pw_hash = generate_password_hash(password)
|
||||
with self.ctx.config(edit=True) as cfg:
|
||||
cfg['password'] = pw_hash
|
||||
|
||||
def check_password(self, password) -> bool:
|
||||
with self.ctx.config() as cfg:
|
||||
return check_password_hash(cfg.password, password)
|
||||
|
||||
|
||||
class AnonymousUserModel(UserModelBase):
|
||||
"""Represents an anonymous user"""
|
||||
def __str__(self) -> str:
|
||||
return '<Anonymous>'
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return '<AnonymousUserModel>'
|
Loading…
Reference in New Issue
Block a user