Get home page and login working #14

Merged
Jaculabilis merged 20 commits from tvb/server-auth into develop 2021-06-29 03:23:59 +00:00
45 changed files with 766 additions and 890 deletions

View File

@ -1,99 +0,0 @@
# Standard library imports
import argparse
import logging
import os
import sys
# Module imports
from amanuensis.cli import describe_commands, get_commands
from amanuensis.config import (
RootConfigDirectoryContext,
ENV_CONFIG_DIR,
ENV_LOG_FILE)
from amanuensis.errors import AmanuensisError
from amanuensis.log import init_logging
from amanuensis.models import ModelFactory
def process_doc(docstring):
return '\n'.join([
line.strip()
for line in (docstring or "").strip().splitlines()
])
def get_parser(valid_commands):
# Set up the top-level parser.
parser = argparse.ArgumentParser(
description=describe_commands(),
formatter_class=argparse.RawDescriptionHelpFormatter)
# The config directory.
parser.add_argument("--config-dir",
dest="config_dir",
default=os.environ.get(ENV_CONFIG_DIR, "./config"),
help="The config directory for Amanuensis")
# Logging settings.
parser.add_argument("--verbose", "-v",
action="store_true",
dest="verbose",
help="Enable verbose console logging")
parser.add_argument("--log-file",
dest="log_file",
default=os.environ.get(ENV_LOG_FILE),
help="Enable verbose file logging")
parser.set_defaults(func=lambda args: parser.print_help())
subp = parser.add_subparsers(
metavar="COMMAND",
dest="command",
help="The command to execute")
# Set up command subparsers.
# command_ functions perform setup or execution depending on
# whether their argument is an ArgumentParser.
for name, func in valid_commands.items():
# Create the subparser, set the docstring as the description.
cmd = subp.add_parser(name,
description=process_doc(func.__doc__),
formatter_class=argparse.RawDescriptionHelpFormatter,
aliases=func.__dict__.get("aliases", []))
# Delegate subparser setup to the command.
func(cmd)
# Store function for later execution.
cmd.set_defaults(func=func)
return parser
def main(argv):
# Enumerate valid commands from the CLI module.
commands = get_commands()
# Parse args
args = get_parser(commands).parse_args(argv)
# First things first, initialize logging
init_logging(args.verbose, args.log_file)
logger = logging.getLogger('amanuensis')
# The init command initializes a config directory at --config-dir.
# All other commands assume that the config dir already exists.
if args.command and args.command != "init":
args.root = RootConfigDirectoryContext(args.config_dir)
args.model_factory = ModelFactory(args.root)
# If verbose logging, dump args namespace
if args.verbose:
logger.debug('amanuensis')
for key, val in vars(args).items():
logger.debug(f' {key}: {val}')
# Execute command.
try:
args.func(args)
except AmanuensisError as e:
logger.error('Unexpected internal {}: {}'.format(
type(e).__name__, str(e)))
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))

View File

@ -3,11 +3,11 @@ Lexicon query interface
"""
import re
from typing import Sequence
from typing import Sequence, Optional
from sqlalchemy import select, func
from amanuensis.db import DbContext, Lexicon
from amanuensis.db import DbContext, Lexicon, Membership
from amanuensis.errors import ArgumentError
@ -17,7 +17,7 @@ RE_ALPHANUM_DASH_UNDER = re.compile(r"^[A-Za-z0-9-_]*$")
def create(
db: DbContext,
name: str,
title: str,
title: Optional[str],
prompt: str,
) -> Lexicon:
"""
@ -55,6 +55,23 @@ def create(
return new_lexicon
def get_all_lexicons(db: DbContext) -> Sequence[Lexicon]:
def from_name(db: DbContext, name: str) -> Lexicon:
"""Get a lexicon by its name."""
return db(select(Lexicon).where(Lexicon.name == name)).scalar_one()
def get_all(db: DbContext) -> Sequence[Lexicon]:
"""Get all lexicons."""
return db(select(Lexicon)).scalars()
def get_joined(db: DbContext, user_id: int) -> Sequence[Lexicon]:
"""Get all lexicons that a player is in."""
return db(
select(Lexicon).join(Lexicon.memberships).where(Membership.user_id == user_id)
).scalars()
def get_public(db: DbContext) -> Sequence[Lexicon]:
"""Get all publicly visible lexicons."""
return db(select(Lexicon).where(Lexicon.public == True)).scalars()

View File

@ -2,10 +2,12 @@
User query interface
"""
import datetime
import re
from typing import Sequence
from typing import Optional, Sequence
from sqlalchemy import select, func
from sqlalchemy import select, func, update
from werkzeug.security import generate_password_hash, check_password_hash
from amanuensis.db import DbContext, User
from amanuensis.errors import ArgumentError
@ -19,7 +21,7 @@ def create(
db: DbContext,
username: str,
password: str,
display_name: str,
display_name: Optional[str],
email: str,
is_site_admin: bool,
) -> User:
@ -59,7 +61,7 @@ def create(
new_user = User(
username=username,
password=password,
password=generate_password_hash(password),
display_name=display_name,
email=email,
is_site_admin=is_site_admin,
@ -69,6 +71,49 @@ def create(
return new_user
def get_all_users(db: DbContext) -> Sequence[User]:
def from_id(db: DbContext, user_id: int) -> Optional[User]:
"""
Get a user by the user's id.
Returns None if no user was found.
"""
user: User = db(select(User).where(User.id == user_id)).scalar_one_or_none()
return user
def from_username(db: DbContext, username: str) -> Optional[User]:
"""
Get a user by the user's username.
Returns None if no user was found.
"""
user: User = db(select(User).where(User.username == username)).scalar_one_or_none()
return user
def get_all(db: DbContext) -> Sequence[User]:
"""Get all users."""
return db(select(User)).scalars()
def password_set(db: DbContext, username: str, new_password: str) -> None:
"""Set a user's password."""
password_hash = generate_password_hash(new_password)
db(update(User).where(User.username == username).values(password=password_hash))
db.session.commit()
def password_check(db: DbContext, username: str, password: str) -> bool:
"""Check if a password is correct."""
user_password_hash: str = db(
select(User.password).where(User.username == username)
).scalar_one()
return check_password_hash(user_password_hash, password)
def update_logged_in(db: DbContext, username: str) -> None:
"""Bump the value of the last_login column for a user."""
db(
update(User)
.where(User.username == username)
.values(last_login=datetime.datetime.now(datetime.timezone.utc))
)
db.session.commit()

View File

@ -1,10 +1,13 @@
from argparse import ArgumentParser
from argparse import ArgumentParser, Namespace
import logging
import logging.config
import os
from typing import Callable
import amanuensis.cli.admin
import amanuensis.cli.lexicon
import amanuensis.cli.user
from amanuensis.db import DbContext
LOGGING_CONFIG = {
@ -63,7 +66,7 @@ def add_subcommand(subparsers, module) -> None:
sc_name, help=sc_help, description=obj.__doc__
)
subcommand.set_defaults(func=obj)
for args, kwargs in obj.__dict__.get("add_argument", []):
for args, kwargs in reversed(obj.__dict__.get("add_argument", [])):
subcommand.add_argument(*args, **kwargs)
@ -76,6 +79,18 @@ def init_logger(args):
logging.config.dictConfig(LOGGING_CONFIG)
def get_db_factory(args: Namespace) -> Callable[[], DbContext]:
"""Factory function for lazy-loading the database in subcommands."""
def get_db() -> DbContext:
"""Lazy loader for the database connection."""
if not os.path.exists(args.db_path):
args.parser.error(f"No database found at {args.db_path}")
return DbContext(path=args.db_path, echo=args.verbose)
return get_db
def main():
"""CLI entry point"""
# Set up the top-level parser
@ -83,8 +98,12 @@ def main():
parser.set_defaults(
parser=parser,
func=lambda args: parser.print_usage(),
get_db=None,
)
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
parser.add_argument(
"--db", dest="db_path", default="db.sqlite", help="Path to Amanuensis database"
)
# Add commands from cli submodules
subparsers = parser.add_subparsers(metavar="COMMAND")
@ -92,7 +111,10 @@ def main():
add_subcommand(subparsers, amanuensis.cli.lexicon)
add_subcommand(subparsers, amanuensis.cli.user)
# Parse args and execute the desired action
# Parse args and perform top-level arg processing
args = parser.parse_args()
init_logger(args)
args.get_db = get_db_factory(args)
# Execute the desired action
args.func(args)

View File

@ -14,24 +14,17 @@ COMMAND_HELP = "Interact with Amanuensis."
LOG = logging.getLogger(__name__)
@add_argument(
"path", metavar="DB_PATH", help="Path to where the database should be created"
)
@add_argument("--force", "-f", action="store_true", help="Overwrite existing database")
@add_argument("--verbose", "-v", action="store_true", help="Enable db echo")
@add_argument("--drop", "-d", action="store_true", help="Overwrite existing database")
def command_init_db(args) -> int:
"""
Initialize the Amanuensis database.
"""
# Check if force is required
if not args.force and os.path.exists(args.path):
args.parser.error(f"{args.path} already exists and --force was not specified")
if args.drop:
open(args.db_path, mode="w").close()
# Initialize the database
db_uri = f"sqlite:///{os.path.abspath(args.path)}"
LOG.info(f"Creating database at {db_uri}")
db = DbContext(db_uri, debug=args.verbose)
db.create_all()
LOG.info(f"Creating database at {args.db_path}")
args.get_db().create_all()
LOG.info("Done")
return 0

View File

@ -1,5 +1,12 @@
import logging
from sqlalchemy import update
import amanuensis.backend.lexicon as lexiq
import amanuensis.backend.membership as memq
import amanuensis.backend.user as userq
from amanuensis.db import DbContext, Lexicon
from .helpers import add_argument
@ -9,22 +16,56 @@ COMMAND_HELP = "Interact with lexicons."
LOG = logging.getLogger(__name__)
@add_argument("lexicon")
@add_argument("user")
@add_argument("--editor", action="store_true")
def command_add(args) -> int:
"""
Add a user to a lexicon.
"""
db: DbContext = args.get_db()
lexicon = lexiq.from_name(db, args.lexicon)
user = userq.from_username(db, args.user)
assert user is not None
memq.create(db, user.id, lexicon.id, args.editor)
LOG.info(f"Added {args.user} to lexicon {args.lexicon}")
return 0
@add_argument("name")
def command_create(args):
"""
Create a lexicon.
"""
raise NotImplementedError()
db: DbContext = args.get_db()
lexiq.create(db, args.name, None, f"Prompt for Lexicon {args.name}")
LOG.info(f"Created lexicon {args.name}")
return 0
def command_delete(args):
@add_argument("name")
@add_argument("--public", dest="public", action="store_const", const=True)
@add_argument("--no-public", dest="public", action="store_const", const=False)
@add_argument("--join", dest="join", action="store_const", const=True)
@add_argument("--no-join", dest="join", action="store_const", const=False)
def command_edit(args):
"""
Delete a lexicon.
Update a lexicon's configuration.
"""
raise NotImplementedError()
db: DbContext = args.get_db()
values = {}
if args.public == True:
values["public"] = True
elif args.public == False:
values["public"] = False
def command_list(args):
"""
List all lexicons and their statuses.
"""
raise NotImplementedError()
if args.join == True:
values["joinable"] = True
elif args.join == False:
values["joinable"] = False
result = db(update(Lexicon).where(Lexicon.name == args.name).values(**values))
LOG.info(f"Updated {result.rowcount} lexicons")
db.session.commit()
return 0 if result.rowcount == 1 else -1

View File

@ -1,4 +1,8 @@
import logging
from typing import Optional
import amanuensis.backend.user as userq
from amanuensis.db import DbContext, User
from .helpers import add_argument
@ -9,11 +13,50 @@ COMMAND_HELP = "Interact with users."
LOG = logging.getLogger(__name__)
def command_create(args):
"""
Create a user.
"""
raise NotImplementedError()
@add_argument("username")
@add_argument("--password", default="password")
@add_argument("--email", default="")
def command_create(args) -> int:
"""Create a user."""
db: DbContext = args.get_db()
userq.create(db, args.username, "password", args.username, args.email, False)
userq.password_set(db, args.username, args.password)
LOG.info(f"Created user {args.username}")
return 0
@add_argument("username")
def command_promote(args) -> int:
"""Make a user a site admin."""
db: DbContext = args.get_db()
user: Optional[User] = userq.from_username(db, args.username)
if user is None:
args.parser.error("User not found")
return -1
if user.is_site_admin:
LOG.info(f"{user.username} is already a site admin.")
else:
user.is_site_admin = True
LOG.info(f"Promoting {user.username} to site admin.")
db.session.commit()
return 0
@add_argument("username")
def command_demote(args):
"""Revoke a user's site admin status."""
db: DbContext = args.get_db()
user: Optional[User] = userq.from_username(db, args.username)
if user is None:
args.parser.error("User not found")
return -1
if not user.is_site_admin:
LOG.info(f"{user.username} is not a site admin.")
else:
user.is_site_admin = False
LOG.info(f"Revoking site admin status for {user.username}.")
db.session.commit()
return 0
def command_delete(args):
@ -30,8 +73,13 @@ def command_list(args):
raise NotImplementedError()
def command_passwd(args):
@add_argument("username")
@add_argument("password")
def command_passwd(args) -> int:
"""
Set a user's password.
"""
raise NotImplementedError()
db: DbContext = args.get_db()
userq.password_set(db, args.username, args.password)
LOG.info(f"Updated password for {args.username}")
return 0

View File

@ -1,6 +1,8 @@
"""
Database connection setup
"""
import os
from sqlalchemy import create_engine, MetaData, event
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session, sessionmaker
@ -27,20 +29,41 @@ ModelBase = declarative_base(metadata=metadata)
class DbContext:
def __init__(self, db_uri, debug=False):
# Create an engine and enable foreign key constraints in sqlite
self.engine = create_engine(db_uri, echo=debug)
"""Class encapsulating connections to the database."""
def __init__(self, path=None, uri=None, echo=False):
"""
Create a database context.
Exactly one of `path` and `uri` should be specified.
"""
if path and uri:
raise ValueError("Only one of path and uri may be specified")
self.db_uri = uri if uri else f"sqlite:///{os.path.abspath(path)}"
# Create an engine and enable foreign key constraints in sqlite
self.engine = create_engine(self.db_uri, echo=echo)
@event.listens_for(self.engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
event.listens_for(self.engine, "connect")(set_sqlite_pragma)
# Create a thread-safe session factory
self.session = scoped_session(
sessionmaker(bind=self.engine), scopefunc=get_ident
)
sm = sessionmaker(bind=self.engine)
def add_lifecycle_hook(sm, from_state, to_state):
def object_lifecycle_hook(_, obj):
print(f"object moved from {from_state} to {to_state}: {obj}")
event.listens_for(sm, f"{from_state}_to_{to_state}")(object_lifecycle_hook)
if echo:
add_lifecycle_hook(sm, "persistent", "detached")
self.session = scoped_session(sm, scopefunc=get_ident)
def __call__(self, *args, **kwargs):
"""Provides shortcut access to session.execute."""

View File

@ -100,6 +100,25 @@ class User(ModelBase):
articles = relationship("Article", back_populates="user")
posts = relationship("Post", back_populates="user")
#########################
# Flask-Login interface #
#########################
@property
def is_authenticated(self: "User") -> bool:
return True
@property
def is_active(self: "User") -> bool:
return True
@property
def is_anonymous(self: "User") -> bool:
return False
def get_id(self: "User") -> str:
return str(self.id)
class Lexicon(ModelBase):
"""

View File

@ -1,104 +0,0 @@
"""
Submodule of functions for creating and managing lexicons within the
general Amanuensis context.
"""
import json
import logging
import os
import re
import time
from typing import Iterable
import uuid
from amanuensis.config import RootConfigDirectoryContext, AttrOrderedDict
from amanuensis.errors import ArgumentError
from amanuensis.models import ModelFactory, UserModel, LexiconModel
from amanuensis.resources import get_stream
logger = logging.getLogger(__name__)
def valid_name(name: str) -> bool:
"""
Validates that a lexicon name consists only of alpahnumerics, dashes,
underscores, and spaces
"""
return re.match(r'^[A-Za-z0-9-_ ]+$', name) is not None
def create_lexicon(
root: RootConfigDirectoryContext,
name: str,
editor: UserModel) -> LexiconModel:
"""
Creates a lexicon with the given name and sets the given user as its editor
"""
# Verify arguments
if not name:
raise ArgumentError(f'Empty lexicon name: "{name}"')
if not valid_name(name):
raise ArgumentError(f'Invalid lexicon name: "{name}"')
with root.lexicon.read_index() as extant_lexicons:
if name in extant_lexicons.keys():
raise ArgumentError(f'Lexicon name already taken: "{name}"')
if editor is None:
raise ArgumentError('Editor must not be None')
# Create the lexicon directory and initialize it with a blank lexicon
lid: str = uuid.uuid4().hex
lex_dir = os.path.join(root.lexicon.path, lid)
os.mkdir(lex_dir)
with get_stream("lexicon.json") as s:
path: str = os.path.join(lex_dir, 'config.json')
with open(path, 'wb') as f:
f.write(s.read())
# Create subdirectories
os.mkdir(os.path.join(lex_dir, 'draft'))
os.mkdir(os.path.join(lex_dir, 'src'))
os.mkdir(os.path.join(lex_dir, 'article'))
# Update the index with the new lexicon
with root.lexicon.edit_index() as index:
index[name] = lid
# Fill out the new lexicon
with root.lexicon[lid].edit_config() as cfg:
cfg.lid = lid
cfg.name = name
cfg.editor = editor.uid
cfg.time.created = int(time.time())
with root.lexicon[lid].edit('info', create=True):
pass # Create an empry config file
# Load the lexicon and add the editor and default character
model_factory: ModelFactory = ModelFactory(root)
lexicon = model_factory.lexicon(lid)
with lexicon.ctx.edit_config() as cfg:
cfg.join.joined.append(editor.uid)
with get_stream('character.json') as template:
character = json.load(template, object_pairs_hook=AttrOrderedDict)
character.cid = 'default'
character.name = 'Ersatz Scrivener'
character.player = None
cfg.character.new(character.cid, character)
# Log the creation
message = f'Created {lexicon.title}, ed. {editor.cfg.displayname} ({lid})'
lexicon.log(message)
logger.info(message)
return lexicon
def load_all_lexicons(
root: RootConfigDirectoryContext) -> Iterable[LexiconModel]:
"""
Iterably loads every lexicon in the config store
"""
model_factory: ModelFactory = ModelFactory(root)
with root.lexicon.read_index() as index:
for lid in index.values():
lexicon: LexiconModel = model_factory.lexicon(lid)
yield lexicon

View File

@ -1,11 +0,0 @@
from .factory import ModelFactory
from .lexicon import LexiconModel
from .user import UserModelBase, UserModel, AnonymousUserModel
__all__ = [member.__name__ for member in [
ModelFactory,
LexiconModel,
UserModelBase,
UserModel,
AnonymousUserModel,
]]

View File

@ -1,57 +0,0 @@
from typing import Optional
from amanuensis.config import is_guid, RootConfigDirectoryContext
from amanuensis.errors import ArgumentError
from .user import UserModel
from .lexicon import LexiconModel
class ModelFactory():
def __init__(self, root: RootConfigDirectoryContext):
self.root: RootConfigDirectoryContext = root
def try_user(self, identifier: str) -> Optional[UserModel]:
user: Optional[UserModel] = None
try:
user = self.user(identifier)
except Exception:
pass
finally:
return user
def user(self, identifier: str) -> UserModel:
"""Get the user model for the given id or username"""
# Ensure we have something to work with
if identifier is None:
raise ArgumentError('identifer must not be None')
# Ensure we have a user guid
if not is_guid(identifier):
with self.root.user.read_index() as index:
uid = index.get(identifier, None)
if uid is None:
raise KeyError(f'Unknown username: {identifier})')
if not is_guid(uid):
raise ValueError(f'Invalid index entry: {uid}')
else:
uid = identifier
user = UserModel(self.root, uid)
return user
def lexicon(self, identifier: str) -> LexiconModel:
"""Get the lexicon model for the given id or name"""
# Ensure we have something to work with
if identifier is None:
raise ArgumentError('identifier must not be None')
# Ensure we have a lexicon guid
if not is_guid(identifier):
with self.root.lexicon.read_index() as index:
lid = index.get(identifier, None)
if lid is None:
raise KeyError(f'Unknown lexicon: {identifier}')
if not is_guid(lid):
raise ValueError(f'Invalid index entry: {lid}')
else:
lid = identifier
lexicon = LexiconModel(self.root, lid)
return lexicon

View File

@ -1,64 +0,0 @@
import time
from typing import cast
from amanuensis.config import (
RootConfigDirectoryContext,
LexiconConfigDirectoryContext,
ReadOnlyOrderedDict)
class LexiconModel():
PREGAME = "unstarted"
ONGOING = "ongoing"
COMPLETE = "completed"
"""Represents a lexicon in the Amanuensis config store"""
def __init__(self, root: RootConfigDirectoryContext, lid: str):
self._lid: str = lid
# Creating the config context implicitly checks for existence
self._ctx: LexiconConfigDirectoryContext = (
cast(LexiconConfigDirectoryContext, root.lexicon[lid]))
with self._ctx.read_config() as config:
self._cfg: ReadOnlyOrderedDict = cast(ReadOnlyOrderedDict, config)
def __str__(self) -> str:
return f'<Lexicon {self.cfg.name}>'
def __repr__(self) -> str:
return f'<LexiconModel({self.lid})>'
# Properties
@property
def lid(self) -> str:
"""Lexicon guid"""
return self._lid
@property
def ctx(self) -> LexiconConfigDirectoryContext:
"""Lexicon config directory context"""
return self._ctx
@property
def cfg(self) -> ReadOnlyOrderedDict:
"""Cached lexicon config"""
return self._cfg
# Utilities
@property
def title(self) -> str:
return self.cfg.get('title') or f'Lexicon {self.cfg.name}'
def log(self, message: str) -> None:
now = int(time.time())
with self.ctx.edit_config() as cfg:
cfg.log.append([now, message])
@property
def status(self) -> str:
if self.cfg.turn.current is None:
return LexiconModel.PREGAME
if self.cfg.turn.current > self.cfg.turn.max:
return LexiconModel.COMPLETE
return LexiconModel.ONGOING

View File

@ -1,83 +0,0 @@
from typing import cast
from werkzeug.security import generate_password_hash, check_password_hash
from amanuensis.config import (
RootConfigDirectoryContext,
UserConfigDirectoryContext,
ReadOnlyOrderedDict)
class UserModelBase():
"""Common base class for auth and anon user models"""
# Properties
@property
def uid(self) -> str:
"""User guid"""
return getattr(self, '_uid', None)
@property
def ctx(self) -> UserConfigDirectoryContext:
"""User config directory context"""
return getattr(self, '_ctx', None)
@property
def cfg(self) -> ReadOnlyOrderedDict:
"""Cached user config"""
return getattr(self, '_cfg', None)
# Flask-Login interfaces
@property
def is_authenticated(self) -> bool:
return self.uid is not None
@property
def is_active(self) -> bool:
return self.uid is not None
@property
def is_anonymous(self) -> bool:
return self.uid is None
def get_id(self) -> str:
return self.uid
class UserModel(UserModelBase):
"""Represents a user in the Amanuensis config store"""
def __init__(self, root: RootConfigDirectoryContext, uid: str):
self._uid: str = uid
# Creating the config context implicitly checks for existence
self._ctx: UserConfigDirectoryContext = (
cast(UserConfigDirectoryContext, root.user[uid]))
with self._ctx.read_config() as config:
self._cfg: ReadOnlyOrderedDict = cast(ReadOnlyOrderedDict, config)
def __str__(self) -> str:
return f'<{self.cfg.username}>'
def __repr__(self) -> str:
return f'<UserModel({self.uid})>'
# Utility methods
def set_password(self, password: str) -> None:
pw_hash = generate_password_hash(password)
with self.ctx.edit_config() as cfg:
cfg['password'] = pw_hash
def check_password(self, password) -> bool:
with self.ctx.read_config() as cfg:
return check_password_hash(cfg.password, password)
class AnonymousUserModel(UserModelBase):
"""Represents an anonymous user"""
def __str__(self) -> str:
return '<Anonymous>'
def __repr__(self) -> str:
return '<AnonymousUserModel>'

View File

@ -2,5 +2,5 @@ import pkg_resources
def get_stream(*path):
rs_path = "/".join(path)
return pkg_resources.resource_stream(__name__, rs_path)
rs_path = "/".join(path)
return pkg_resources.resource_stream(__name__, rs_path)

View File

@ -1,13 +0,0 @@
{
"version": "0",
"aid": null,
"lexicon": null,
"character": null,
"title": null,
"turn": null,
"status": {
"ready": false,
"approved": false
},
"contents": null
}

View File

@ -1,7 +0,0 @@
{
"version": "0",
"cid": null,
"name": null,
"player": null,
"signature": null
}

View File

@ -1,15 +0,0 @@
{
"version": "0",
"secret_key": null,
"address": "127.0.0.1",
"port": "5000",
"lexicon_data": "./lexicon",
"static_root": "../resources",
"email": {
"server": null,
"port": null,
"tls": null,
"username": null,
"password": null
}
}

View File

@ -1,107 +0,0 @@
{
"version": "0",
"lid": null,
"name": null,
"title": null,
"editor": null,
"prompt": null,
"time": {
"created": null,
"completed": null
},
"turn": {
"current": null,
"max": 8,
"assignment": {
}
},
"join": {
"public": false,
"open": false,
"password": null,
"max_players": 4,
"chars_per_player": 1,
"joined": []
},
"publish": {
"notify": {
"editor_on_ready": true,
"player_on_reject": true,
"player_on_accept": false
},
"deadlines": null,
"asap": false,
"quorum": null,
"block_on_ready": true
},
"article": {
"index": {
"list": [
{
"type": "char",
"pri": 0,
"pattern": "ABC"
},
{
"type": "char",
"pri": 0,
"pattern": "DEF"
},
{
"type": "char",
"pri": 0,
"pattern": "GHI"
},
{
"type": "char",
"pri": 0,
"pattern": "JKL"
},
{
"type": "char",
"pri": 0,
"pattern": "MNO"
},
{
"type": "char",
"pri": 0,
"pattern": "PQRS"
},
{
"type": "char",
"pri": 0,
"pattern": "TUV"
},
{
"type": "char",
"pri": 0,
"pattern": "WXYZ"
}
],
"capacity": null
},
"citation": {
"allow_self": false,
"min_extant": null,
"max_extant": null,
"min_phantom": null,
"max_phantom": null,
"min_total": null,
"max_total": null,
"min_chars": null,
"max_chars": null
},
"word_limit": {
"soft": null,
"hard": null
},
"addendum": {
"allowed": false,
"max": null
}
},
"character": {
},
"log": [
]
}

View File

@ -1,13 +0,0 @@
{
"version": "0",
"uid": null,
"username": null,
"displayname": null,
"email": null,
"password": null,
"created": null,
"last_login": null,
"last_activity": null,
"new_password_required": true,
"is_admin": false
}

View File

@ -1,11 +1,15 @@
from datetime import datetime, timezone
import json
import os
from flask import Flask, g
import amanuensis.backend.lexicon
import amanuensis.backend.user
from amanuensis.config import AmanuensisConfig, CommandLineConfig
from amanuensis.db import DbContext
import amanuensis.server.home
import amanuensis.server.auth as auth
import amanuensis.server.home as home
def get_app(
@ -30,7 +34,7 @@ def get_app(
# Create the database context, if one wasn't already given
if db is None:
db = DbContext(app.config["DATABASE_URI"])
db = DbContext(uri=app.config["DATABASE_URI"])
# Make the database connection available to requests via g
def db_setup():
@ -47,11 +51,26 @@ def get_app(
# Configure jinja options
app.jinja_options.update(trim_blocks=True, lstrip_blocks=True)
def date_format(dt: datetime, formatstr="%Y-%m-%d %H:%M:%S%z") -> str:
if dt is None:
return "never"
# Cast db time to UTC, then convert to local timezone
adjusted = dt.replace(tzinfo=timezone.utc).astimezone()
return adjusted.strftime(formatstr)
app.template_filter("date")(date_format)
def include_backend():
return {"db": db, "lexiq": amanuensis.backend.lexicon, "userq": amanuensis.backend.user}
app.context_processor(include_backend)
# Set up Flask-Login
# TODO
auth.get_login_manager().init_app(app)
# Register blueprints
app.register_blueprint(amanuensis.server.home.bp)
app.register_blueprint(auth.bp)
app.register_blueprint(home.bp)
def test():
return "Hello, world!"

View File

@ -1,76 +1,79 @@
import logging
import time
from typing import Optional
from flask import (
Blueprint,
render_template,
redirect,
url_for,
flash,
current_app)
Blueprint,
flash,
g,
redirect,
render_template,
url_for,
)
from flask_login import (
login_user,
logout_user,
login_required,
LoginManager)
AnonymousUserMixin,
login_user,
logout_user,
login_required,
LoginManager,
)
from amanuensis.config import RootConfigDirectoryContext
from amanuensis.models import ModelFactory, AnonymousUserModel
import amanuensis.backend.user as userq
from amanuensis.db import User
from .forms import LoginForm
logger = logging.getLogger(__name__)
LOG = logging.getLogger(__name__)
bp = Blueprint("auth", __name__, url_prefix="/auth", template_folder=".")
def get_login_manager(root: RootConfigDirectoryContext) -> LoginManager:
"""
Creates a login manager
"""
login_manager = LoginManager()
login_manager.login_view = 'auth.login'
login_manager.anonymous_user = AnonymousUserModel
def get_login_manager() -> LoginManager:
"""Login manager factory"""
login_manager = LoginManager()
login_manager.login_view = "auth.login"
login_manager.anonymous_user = AnonymousUserMixin
@login_manager.user_loader
def load_user(uid):
return current_app.config['model_factory'].user(str(uid))
def load_user(user_id_str: str) -> Optional[User]:
try:
user_id = int(user_id_str)
except:
return None
return userq.from_id(g.db, user_id)
return login_manager
login_manager.user_loader(load_user)
return login_manager
bp_auth = Blueprint('auth', __name__,
url_prefix='/auth',
template_folder='.')
@bp_auth.route('/login/', methods=['GET', 'POST'])
@bp.route("/login/", methods=["GET", "POST"])
def login():
model_factory: ModelFactory = current_app.config['model_factory']
form = LoginForm()
form = LoginForm()
if not form.validate_on_submit():
# Either the request was GET and we should render the form,
# or the request was POST and validation failed.
return render_template('auth.login.jinja', form=form)
if not form.validate_on_submit():
# Either the request was GET and we should render the form,
# or the request was POST and validation failed.
return render_template("auth.login.jinja", form=form)
# POST with valid data
username = form.username.data
user = model_factory.try_user(username)
if not user or not user.check_password(form.password.data):
# Bad creds
flash("Login not recognized")
return redirect(url_for('auth.login'))
# POST with valid data
username: str = form.username.data
password: str = form.password.data
user: User = userq.from_username(g.db, username)
if not user or not userq.password_check(g.db, username, password):
# Bad creds
flash("Login not recognized")
return redirect(url_for("auth.login"))
# Login credentials were correct
remember_me = form.remember.data
login_user(user, remember=remember_me)
with user.ctx.edit_config() as cfg:
cfg.last_login = int(time.time())
logger.info('Logged in user "{0.username}" ({0.uid})'.format(user.cfg))
return redirect(url_for('home.home'))
# Login credentials were correct
remember_me: bool = form.remember.data
login_user(user, remember=remember_me)
userq.update_logged_in(g.db, username)
LOG.info("Logged in user {0.username} ({0.id})".format(user))
return redirect(url_for("home.home"))
@bp_auth.route("/logout/", methods=['GET'])
@bp.get("/logout/")
@login_required
def logout():
logout_user()
return redirect(url_for('home.home'))
logout_user()
return redirect(url_for("home.home"))

View File

@ -4,12 +4,9 @@ from wtforms.validators import DataRequired
class LoginForm(FlaskForm):
"""/auth/login/"""
username = StringField(
'Username',
validators=[DataRequired()])
password = PasswordField(
'Password',
validators=[DataRequired()])
remember = BooleanField('Stay logged in')
submit = SubmitField('Log in')
"""/auth/login/"""
username = StringField("Username", validators=[DataRequired()])
password = PasswordField("Password", validators=[DataRequired()])
remember = BooleanField("Stay logged in")
submit = SubmitField("Log in")

View File

@ -21,13 +21,6 @@ def register_custom_filters(app):
val = getattr(user.cfg, attr)
return val
@app.template_filter("asdate")
def timestamp_to_readable(ts, formatstr="%Y-%m-%d %H:%M:%S"):
if ts is None:
return "null"
dt = datetime.fromtimestamp(ts)
return dt.strftime(formatstr)
@app.template_filter("articlelink")
def article_link(title):
return url_for(

View File

@ -1,43 +1,23 @@
from flask import Blueprint, render_template, g
# from flask import Blueprint, render_template, redirect, url_for, current_app
# from flask_login import login_required, current_user
import amanuensis.backend.user as userq
import amanuensis.backend.lexicon as lexiq
# from amanuensis.config import RootConfigDirectoryContext
# from amanuensis.lexicon import create_lexicon, load_all_lexicons
# from amanuensis.models import UserModel, ModelFactory
# from amanuensis.server.helpers import admin_required
# from .forms import LexiconCreateForm
bp = Blueprint("home", __name__, url_prefix="/home", template_folder=".")
# @bp.get("/")
# def home():
# Show lexicons that are visible to the current user
# return "TODO"
# user_lexicons = []
# public_lexicons = []
# for lexicon in load_all_lexicons(root):
# if user.uid in lexicon.cfg.join.joined:
# user_lexicons.append(lexicon)
# elif lexicon.cfg.join.public:
# public_lexicons.append(lexicon)
# return render_template(
# 'home.root.jinja',
# user_lexicons=user_lexicons,
# public_lexicons=public_lexicons)
@bp.get("/")
def home():
return render_template("home.root.jinja")
@bp.get("/admin/")
# @login_required
# @admin_required
def admin():
return render_template("home.admin.jinja", db=g.db, userq=userq, lexiq=lexiq)
return render_template("home.admin.jinja", userq=userq, lexiq=lexiq)
# @bp_home.route("/admin/create/", methods=['GET', 'POST'])

View File

@ -2,16 +2,16 @@ from flask_wtf import FlaskForm
from wtforms import StringField, SubmitField, TextAreaField
from wtforms.validators import DataRequired
from amanuensis.server.forms import User, Lexicon
# from amanuensis.server.forms import User, Lexicon
class LexiconCreateForm(FlaskForm):
"""/admin/create/"""
lexiconName = StringField(
'Lexicon name',
validators=[DataRequired(), Lexicon(should_exist=False)])
editorName = StringField(
'Username of editor',
validators=[DataRequired(), User(should_exist=True)])
promptText = TextAreaField('Prompt')
submit = SubmitField('Create')
# class LexiconCreateForm(FlaskForm):
# """/admin/create/"""
# lexiconName = StringField(
# 'Lexicon name',
# validators=[DataRequired(), Lexicon(should_exist=False)])
# editorName = StringField(
# 'Username of editor',
# validators=[DataRequired(), User(should_exist=True)])
# promptText = TextAreaField('Prompt')
# submit = SubmitField('Create')

View File

@ -4,17 +4,17 @@
{% block header %}<h2>Amanuensis - Admin Dashboard</h2>{% endblock %}
{# TODO #}
{% block sb_home %}<a href="#{#{ url_for('home.home') }#}">Home</a>{% endblock %}
{% block sb_home %}<a href="{{ url_for('home.home') }}">Home</a>{% endblock %}
{% block sb_create %}<a href="#{#{ url_for('home.admin_create') }#}">Create a lexicon</a>{% endblock %}
{% set template_sidebar_rows = [self.sb_home(), self.sb_create()] %}
{% block main %}
<p>Users:</p>
{% for user in userq.get_all_users(db) %}
{% for user in userq.get_all(db) %}
{{ macros.dashboard_user_item(user) }}
{% endfor %}
<p>Lexicons:</p>
{% for lexicon in lexiq.get_all_lexicons(db) %}
{% for lexicon in lexiq.get_all(db) %}
{{ macros.dashboard_lexicon_item(lexicon) }}
{% endfor %}
{% endblock %}

View File

@ -11,10 +11,16 @@
<span style="color:#ff0000">{{ message }}</span><br>
{% endfor %}
{% if current_user.is_authenticated %}
{% set joined = lexiq.get_joined(db, current_user.id)|list %}
{% else %}
{% set joined = [] %}
{% endif %}
{% if current_user.is_authenticated %}
<h2>Your games</h2>
{% if user_lexicons %}
{% for lexicon in user_lexicons %}
{% if joined %}
{% for lexicon in joined %}
{{ macros.dashboard_lexicon_item(lexicon) }}
{% endfor %}
{% else %}
@ -22,9 +28,10 @@
{% endif %}
{% endif %}
{% set public = lexiq.get_public(db)|reject("in", joined)|list %}
<h2>Public games</h2>
{% if public_lexicons %}
{% for lexicon in public_lexicons %}
{% if public %}
{% for lexicon in public %}
{{ macros.dashboard_lexicon_item(lexicon) }}
{% endfor %}
{% else %}
@ -34,7 +41,7 @@
{% endblock %}
{% set template_content_blocks = [self.main()] %}
{% if current_user.cfg.is_admin %}
{% if current_user.is_site_admin %}
{% block admin_dash %}
<a href="{{ url_for('home.admin') }}" style="display:block; text-align:center;">Admin dashboard</a>
{% endblock %}

View File

@ -3,45 +3,45 @@
<div class="dashboard-lexicon-item dashboard-lexicon-{{ status }}">
<p>
<span class="dashboard-lexicon-item-title">
<a href="#{#{ url_for('lexicon.contents', name=lexicon.cfg.name) }#}">
{{ lexicon.full_title }}</a>
<a href="#{#{ url_for('lexicon.contents', name=lexicon.cfg.name) }#}">{{ lexicon.full_title }}</a>
</span>
[{{ lexicon.status.capitalize() }}]
[{{ status.capitalize() }}]
</p>
<p><i>{{ lexicon.prompt }}</i></p>
{# {% if current_user.is_authenticated %} #}
{% if current_user.is_authenticated %}
<p>
{# TODO #}
{# {%
if current_user.uid in lexicon.cfg.join.joined
or current_user.cfg.is_admin
%} #}
Editor: {#{ lexicon.cfg.editor|user_attr('username') }#} /
Players:
{# {% for uid in lexicon.cfg.join.joined %} #}
{# {{ uid|user_attr('username') }}{% if not loop.last %}, {% endif %} #}
{# {% endfor %} #}
{# ({{ lexicon.cfg.join.joined|count }}/{{ lexicon.cfg.join.max_players }}) #}
{# {% else %} #}
{# Players: {{ lexicon.cfg.join.joined|count }}/{{ lexicon.cfg.join.max_players }} #}
{# {% if lexicon.cfg.join.public and lexicon.cfg.join.open %} #}
{# / <a href="{{ url_for('lexicon.join', name=lexicon.cfg.name) }}"> #}
{# Join game #}
{# </a> #}
{# {% endif %} #}
{# {% endif %} #}
{#-
Show detailed player information if the current user is a member of the lexicon or if the current user is a site admin. The filter sequence must be converted to a list because it returns a generator, which is truthy.
-#}
{%-
if lexicon.memberships|map(attribute="user_id")|select("equalto", current_user.id)|list
or current_user.is_site_admin
-%}
Editor: {{
lexicon.memberships|selectattr("is_editor")|map(attribute="user")|map(attribute="username")|join(", ")
}} / Players: {{
lexicon.memberships|map(attribute="user")|map(attribute="username")|join(", ")
}} ({{ lexicon.memberships|count }}
{%- if lexicon.player_limit is not none -%}
/{{ lexicon.player_limit }}
{%- endif -%})
{%- else -%}
Players: {{ lexicon.memberships|count }}{% if lexicon.player_limit is not none %} / {{ lexicon.player_limit }}{% endif -%}
{%-
if lexicon.public and lexicon.joinable
%} / <a href="#{#{ url_for('lexicon.join', name=lexicon.cfg.name) }#}">Join game</a>
{%- endif -%}
{%- endif -%}
</p>
{# {% endif %} #}
{% endif %}
</div>
{% endmacro %}
{% macro dashboard_user_item(user) %}
<div class="dashboard-lexicon-item">
<p>
<b>{{ user.username }}</b>
{% if user.username != user.display_name %} / {{ user.display_name }}{% endif %}
(id #{{user.id}})
<b>{{ user.username }}</b> {% if user.username != user.display_name %} / {{ user.display_name }}{% endif %} (id #{{user.id}}){% if user.is_site_admin %} <b>[ADMIN]</b>{% endif %}
</p>
<p>Last activity: {{ user.last_activity }} &mdash; Last login: {{ user.last_login }}</p>
<p>Last activity: {{ user.last_activity|date }} &mdash; Last login: {{ user.last_login|date }}</p>
</div>
{% endmacro %}

View File

@ -11,13 +11,12 @@
<div id="wrapper">
<div id="header">
<div id="login-status" {% block login_status_attr %}{% endblock %}>
{# TODO #}
{# {% if current_user.is_authenticated %}
<b>{{ current_user.cfg.username -}}</b>
{% if current_user.is_authenticated %}
<b>{{ current_user.username -}}</b>
(<a href="{{ url_for('auth.logout') }}">Logout</a>)
{% else %} #}
<a href="#{#{ url_for('auth.login') }#}">Login</a>
{# {% endif %} #}
{% else %}
<a href="{{ url_for('auth.login') }}">Login</a>
{% endif %}
</div>
{% block header %}{% endblock %}
</div>

View File

@ -1,4 +1,4 @@
[mypy]
ignore_missing_imports = true
exclude = "|amanuensis/lexicon/.*|amanuensis/models/.*|amanuensis/resources/.*|amanuensis/server/.*|amanuensis/user/.*|amanuensis/__main__.py|"
exclude = "|amanuensis/lexicon/.*|amanuensis/server/.*|amanuensis/server/lexicon/.*|amanuensis/server/session/.*|"
; mypy stable doesn't support pyproject.toml yet

121
poetry.lock generated
View File

@ -28,6 +28,21 @@ docs = ["furo", "sphinx", "zope.interface", "sphinx-notfound-page"]
tests = ["coverage[toml] (>=5.0.2)", "hypothesis", "pympler", "pytest (>=4.3.0)", "six", "mypy", "pytest-mypy-plugins", "zope.interface"]
tests_no_zope = ["coverage[toml] (>=5.0.2)", "hypothesis", "pympler", "pytest (>=4.3.0)", "six", "mypy", "pytest-mypy-plugins"]
[[package]]
name = "beautifulsoup4"
version = "4.9.3"
description = "Screen-scraping library"
category = "dev"
optional = false
python-versions = "*"
[package.dependencies]
soupsieve = {version = ">1.2", markers = "python_version >= \"3.0\""}
[package.extras]
html5lib = ["html5lib"]
lxml = ["lxml"]
[[package]]
name = "black"
version = "21.6b0"
@ -50,6 +65,17 @@ d = ["aiohttp (>=3.6.0)", "aiohttp-cors (>=0.4.0)"]
python2 = ["typed-ast (>=1.4.2)"]
uvloop = ["uvloop (>=0.15.2)"]
[[package]]
name = "bs4"
version = "0.0.1"
description = "Dummy package for Beautiful Soup"
category = "dev"
optional = false
python-versions = "*"
[package.dependencies]
beautifulsoup4 = "*"
[[package]]
name = "click"
version = "8.0.1"
@ -100,17 +126,20 @@ Flask = "*"
[[package]]
name = "flask-wtf"
version = "0.14.3"
version = "0.15.1"
description = "Simple integration of Flask and WTForms."
category = "main"
optional = false
python-versions = "*"
python-versions = ">= 3.6"
[package.dependencies]
Flask = "*"
itsdangerous = "*"
WTForms = "*"
[package.extras]
email = ["email-validator"]
[[package]]
name = "greenlet"
version = "1.1.0"
@ -260,9 +289,17 @@ category = "dev"
optional = false
python-versions = "*"
[[package]]
name = "soupsieve"
version = "2.2.1"
description = "A modern CSS selector implementation for Beautiful Soup."
category = "dev"
optional = false
python-versions = ">=3.6"
[[package]]
name = "sqlalchemy"
version = "1.4.18"
version = "1.4.19"
description = "Database Abstraction Library"
category = "main"
optional = false
@ -353,7 +390,7 @@ locale = ["Babel (>=1.3)"]
[metadata]
lock-version = "1.1"
python-versions = "^3.8"
content-hash = "493d96d9f3aa7056057b41877a76b5d4c4bcbd7f0a3c2864e4221024547ded87"
content-hash = "97e970853a3db968f05e70b83348d52d1a5aaed12a844b30cc15d039827233d4"
[metadata.files]
appdirs = [
@ -368,10 +405,18 @@ attrs = [
{file = "attrs-21.2.0-py2.py3-none-any.whl", hash = "sha256:149e90d6d8ac20db7a955ad60cf0e6881a3f20d37096140088356da6c716b0b1"},
{file = "attrs-21.2.0.tar.gz", hash = "sha256:ef6aaac3ca6cd92904cdd0d83f629a15f18053ec84e6432106f7a4d04ae4f5fb"},
]
beautifulsoup4 = [
{file = "beautifulsoup4-4.9.3-py2-none-any.whl", hash = "sha256:4c98143716ef1cb40bf7f39a8e3eec8f8b009509e74904ba3a7b315431577e35"},
{file = "beautifulsoup4-4.9.3-py3-none-any.whl", hash = "sha256:fff47e031e34ec82bf17e00da8f592fe7de69aeea38be00523c04623c04fb666"},
{file = "beautifulsoup4-4.9.3.tar.gz", hash = "sha256:84729e322ad1d5b4d25f805bfa05b902dd96450f43842c4e99067d5e1369eb25"},
]
black = [
{file = "black-21.6b0-py3-none-any.whl", hash = "sha256:dfb8c5a069012b2ab1e972e7b908f5fb42b6bbabcba0a788b86dc05067c7d9c7"},
{file = "black-21.6b0.tar.gz", hash = "sha256:dc132348a88d103016726fe360cb9ede02cecf99b76e3660ce6c596be132ce04"},
]
bs4 = [
{file = "bs4-0.0.1.tar.gz", hash = "sha256:36ecea1fd7cc5c0c6e4a1ff075df26d50da647b75376626cc186e2212886dd3a"},
]
click = [
{file = "click-8.0.1-py3-none-any.whl", hash = "sha256:fba402a4a47334742d782209a7c79bc448911afe1149d07bdabdf480b3e2f4b6"},
{file = "click-8.0.1.tar.gz", hash = "sha256:8c04c11192119b1ef78ea049e0a6f0463e4c48ef00a30160c704337586f3ad7a"},
@ -389,8 +434,8 @@ flask-login = [
{file = "Flask_Login-0.5.0-py2.py3-none-any.whl", hash = "sha256:7451b5001e17837ba58945aead261ba425fdf7b4f0448777e597ddab39f4fba0"},
]
flask-wtf = [
{file = "Flask-WTF-0.14.3.tar.gz", hash = "sha256:d417e3a0008b5ba583da1763e4db0f55a1269d9dd91dcc3eb3c026d3c5dbd720"},
{file = "Flask_WTF-0.14.3-py2.py3-none-any.whl", hash = "sha256:57b3faf6fe5d6168bda0c36b0df1d05770f8e205e18332d0376ddb954d17aef2"},
{file = "Flask-WTF-0.15.1.tar.gz", hash = "sha256:ff177185f891302dc253437fe63081e7a46a4e99aca61dfe086fb23e54fff2dc"},
{file = "Flask_WTF-0.15.1-py2.py3-none-any.whl", hash = "sha256:6ff7af73458f182180906a37a783e290bdc8a3817fe4ad17227563137ca285bf"},
]
greenlet = [
{file = "greenlet-1.1.0-cp27-cp27m-macosx_10_14_x86_64.whl", hash = "sha256:60848099b76467ef09b62b0f4512e7e6f0a2c977357a036de602b653667f5f4c"},
@ -586,37 +631,41 @@ regex = [
{file = "regex-2021.4.4-cp39-cp39-win_amd64.whl", hash = "sha256:97f29f57d5b84e73fbaf99ab3e26134e6687348e95ef6b48cfd2c06807005a07"},
{file = "regex-2021.4.4.tar.gz", hash = "sha256:52ba3d3f9b942c49d7e4bc105bb28551c44065f139a65062ab7912bef10c9afb"},
]
soupsieve = [
{file = "soupsieve-2.2.1-py3-none-any.whl", hash = "sha256:c2c1c2d44f158cdbddab7824a9af8c4f83c76b1e23e049479aa432feb6c4c23b"},
{file = "soupsieve-2.2.1.tar.gz", hash = "sha256:052774848f448cf19c7e959adf5566904d525f33a3f8b6ba6f6f8f26ec7de0cc"},
]
sqlalchemy = [
{file = "SQLAlchemy-1.4.18-cp27-cp27m-macosx_10_14_x86_64.whl", hash = "sha256:d76abceeb6f7c564fdbc304b1ce17ec59664ca7ed0fe6dbc6fc6a960c91370e3"},
{file = "SQLAlchemy-1.4.18-cp27-cp27m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:4cdc91bb3ee5b10e24ec59303131b791f3f82caa4dd8b36064d1918b0f4d0de4"},
{file = "SQLAlchemy-1.4.18-cp27-cp27m-win32.whl", hash = "sha256:3690fc0fc671419debdae9b33df1434ac9253155fd76d0f66a01f7b459d56ee6"},
{file = "SQLAlchemy-1.4.18-cp27-cp27m-win_amd64.whl", hash = "sha256:5b827d3d1d982b38d2bab551edf9893c4734b5db9b852b28d3bc809ea7e179f6"},
{file = "SQLAlchemy-1.4.18-cp27-cp27mu-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:495cce8174c670f1d885e2259d710b0120888db2169ea14fc32d1f72e7950642"},
{file = "SQLAlchemy-1.4.18-cp36-cp36m-macosx_10_14_x86_64.whl", hash = "sha256:60cfe1fb59a34569816907cb25bb256c9490824679c46777377bcc01f6813a81"},
{file = "SQLAlchemy-1.4.18-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f3357948fa439eb5c7241a8856738605d7ab9d9f276ca5c5cc3220455a5f8e6c"},
{file = "SQLAlchemy-1.4.18-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:93394d68f02ecbf8c0a4355b6452793000ce0ee7aef79d2c85b491da25a88af7"},
{file = "SQLAlchemy-1.4.18-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:56958dd833145f1aa75f8987dfe0cf6f149e93aa31967b7004d4eb9cb579fefc"},
{file = "SQLAlchemy-1.4.18-cp36-cp36m-win32.whl", hash = "sha256:664c6cc84a5d2bad2a4a3984d146b6201b850ba0a7125b2fcd29ca06cddac4b1"},
{file = "SQLAlchemy-1.4.18-cp36-cp36m-win_amd64.whl", hash = "sha256:77549e5ae996de50ad9f69f863c91daf04842b14233e133335b900b152bffb07"},
{file = "SQLAlchemy-1.4.18-cp37-cp37m-macosx_10_14_x86_64.whl", hash = "sha256:e2aa39fdf5bff1c325a8648ac1957a0320c66763a3fa5f0f4a02457b2afcf372"},
{file = "SQLAlchemy-1.4.18-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ffb18eb56546aa66640fef831e5d0fe1a8dfbf11cdf5b00803826a01dbbbf3b1"},
{file = "SQLAlchemy-1.4.18-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:cc474d0c40cef94d9b68980155d686d5ad43a9ca0834a8729052d3585f289d57"},
{file = "SQLAlchemy-1.4.18-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5d4b2c23d20acf631456e645227cef014e7f84a111118d530cfa1d6053fd05a9"},
{file = "SQLAlchemy-1.4.18-cp37-cp37m-win32.whl", hash = "sha256:45bbb935b305e381bcb542bf4d952232282ba76881e3458105e4733ba0976060"},
{file = "SQLAlchemy-1.4.18-cp37-cp37m-win_amd64.whl", hash = "sha256:3a6afb7a55374329601c8fcad277f0a47793386255764431c8f6a231a6947ee9"},
{file = "SQLAlchemy-1.4.18-cp38-cp38-macosx_10_14_x86_64.whl", hash = "sha256:9a62b06ad450386a2e671d0bcc5cd430690b77a5cd41c54ede4e4bf46d7a4978"},
{file = "SQLAlchemy-1.4.18-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:70674f2ff315a74061da7af1225770578d23f4f6f74dd2e1964493abd8d804bc"},
{file = "SQLAlchemy-1.4.18-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:4f375c52fed5f2ecd06be18756f121b3167a1fdc4543d877961fba04b1713214"},
{file = "SQLAlchemy-1.4.18-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:eba098a4962e1ab0d446c814ae67e30da82c446b382cf718306cc90d4e2ad85f"},
{file = "SQLAlchemy-1.4.18-cp38-cp38-win32.whl", hash = "sha256:ee3428f6100ff2b07e7ecec6357d865a4d604c801760094883587ecdbf8a3533"},
{file = "SQLAlchemy-1.4.18-cp38-cp38-win_amd64.whl", hash = "sha256:5c62fff70348e3f8e4392540d31f3b8c251dc8eb830173692e5d61896d4309d6"},
{file = "SQLAlchemy-1.4.18-cp39-cp39-macosx_10_14_x86_64.whl", hash = "sha256:8924d552decf1a50d57dca4984ebd0778a55ca2cb1c0ef16df8c1fed405ff290"},
{file = "SQLAlchemy-1.4.18-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:284b6df04bc30e886998e0fdbd700ef9ffb83bcb484ffc54d4084959240dce91"},
{file = "SQLAlchemy-1.4.18-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:146af9e67d0f821b28779d602372e65d019db01532d8f7101e91202d447c14ec"},
{file = "SQLAlchemy-1.4.18-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2129d33b54da4d4771868a3639a07f461adc5887dbd9e0a80dbf560272245525"},
{file = "SQLAlchemy-1.4.18-cp39-cp39-win32.whl", hash = "sha256:0653d444d52f2b9a0cba1ea5cd0fc64e616ee3838ee86c1863781b2a8670fc0c"},
{file = "SQLAlchemy-1.4.18-cp39-cp39-win_amd64.whl", hash = "sha256:c824d14b52000597dfcced0a4e480fd8664b09fed606e746a2c67fe5fbe8dfd9"},
{file = "SQLAlchemy-1.4.18.tar.gz", hash = "sha256:d25210f5f1a6b7b6b357d8fa199fc1d5be828c67cc1af517600c02e5b2727e4c"},
{file = "SQLAlchemy-1.4.19-cp27-cp27m-macosx_10_14_x86_64.whl", hash = "sha256:ddbce8fe4d0190db21db602e38aaf4c158c540b49f1ef7475323ec682a9fbf2d"},
{file = "SQLAlchemy-1.4.19-cp27-cp27m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:942ca49b7ec7449d2473a6587825c55ad99534ddfc4eee249dd42be3cc1aa8c9"},
{file = "SQLAlchemy-1.4.19-cp27-cp27m-win32.whl", hash = "sha256:9c0945c79cbe507b49524e31a4bb8700060bbccb60bb553df6432e176baff3d5"},
{file = "SQLAlchemy-1.4.19-cp27-cp27m-win_amd64.whl", hash = "sha256:6fd1b745ade2020a1a7bf1e22536d8afe86287882c81ca5d860bdf231d5854e9"},
{file = "SQLAlchemy-1.4.19-cp27-cp27mu-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:0fb3f73e5009f5a4c9b24469939d3d57cc3ad8099a09c0cfefc47fe45ab7ffbe"},
{file = "SQLAlchemy-1.4.19-cp36-cp36m-macosx_10_14_x86_64.whl", hash = "sha256:64eab458619ef759f16f0f82242813d3289e829f8557fbc7c212ca4eadf96472"},
{file = "SQLAlchemy-1.4.19-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:311051c06f905774427b4a92dcb3924d6ee563dea3a88176da02fdfc572d0d1d"},
{file = "SQLAlchemy-1.4.19-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:a34a7fd3353ee61a1dca72fc0c3e38d4e56bdc2c343e712f60a8c70acd4ef5bf"},
{file = "SQLAlchemy-1.4.19-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2ace9ab2af9d7d7b0e2ff2178809941c56ab8921e38128278192a73a8a1c08a2"},
{file = "SQLAlchemy-1.4.19-cp36-cp36m-win32.whl", hash = "sha256:96d3d4a7ead376d738775a1fa9786dc17a31975ec664cea284e53735c79a5686"},
{file = "SQLAlchemy-1.4.19-cp36-cp36m-win_amd64.whl", hash = "sha256:20f4bf1459548a74aade997cb045015e4d72f0fde1789b09b3bb380be28f6511"},
{file = "SQLAlchemy-1.4.19-cp37-cp37m-macosx_10_14_x86_64.whl", hash = "sha256:8cba69545246d16c6d2a12ce45865947cbdd814bacddf2e532fdd4512e70728c"},
{file = "SQLAlchemy-1.4.19-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:57ba8a96b6d058c7dcf44de8ac0955b7a787f7177a0221dd4b8016e0191268f5"},
{file = "SQLAlchemy-1.4.19-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:8f1e7f4de05c15d6b46af12f3cf0c2552f2940d201a49926703249a62402d851"},
{file = "SQLAlchemy-1.4.19-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5c92d9ebf4b38c22c0c9e4f203a80e101910a50dc555b4578816932015b97d7f"},
{file = "SQLAlchemy-1.4.19-cp37-cp37m-win32.whl", hash = "sha256:c6efc7477551ba9ce632d5c3b448b7de0277c86005eec190a1068fcc7115fd0e"},
{file = "SQLAlchemy-1.4.19-cp37-cp37m-win_amd64.whl", hash = "sha256:e2761b925fda550debfd5a8bc3cef9debc9a23c6a280429c4ec3a07c35c6b4b3"},
{file = "SQLAlchemy-1.4.19-cp38-cp38-macosx_10_14_x86_64.whl", hash = "sha256:58d4f79d119010fdced6e7fd7e4b9f2230dbf55a8235d7c58b1c8207ef74791b"},
{file = "SQLAlchemy-1.4.19-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:cefd44faca7c57534503261f6fab49bd47eb9c2945ee0bab09faaa8cb047c24f"},
{file = "SQLAlchemy-1.4.19-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:9133635edcec1e7fbfc16eba5dc2b5b3b11818d25b7a57cfcbfa8d3b3e9594fd"},
{file = "SQLAlchemy-1.4.19-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d3cf5f543d048a7c8da500133068c5c90c97a2c4bf0c027928a85028a519f33d"},
{file = "SQLAlchemy-1.4.19-cp38-cp38-win32.whl", hash = "sha256:d04160462f874eaa4d88721a0d5ecca8ebf433616801efe779f252ef87b0e216"},
{file = "SQLAlchemy-1.4.19-cp38-cp38-win_amd64.whl", hash = "sha256:45b0f773e195d8d51e2fd67cb5b5fb32f5a1f5e7f0752016207091bed108909a"},
{file = "SQLAlchemy-1.4.19-cp39-cp39-macosx_10_14_x86_64.whl", hash = "sha256:93ba458b3c279581288a10a55df2aa6ac3509882228fcbad9d9d88069f899337"},
{file = "SQLAlchemy-1.4.19-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6317701c06a829b066c794545512bb70b1a10a74574cfa5658a0aaf49f31aa93"},
{file = "SQLAlchemy-1.4.19-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:95a9fd0a11f89a80d8815418eccba034f3fec8ea1f04c41b6b8decc5c95852e9"},
{file = "SQLAlchemy-1.4.19-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9014fd1d8aebcb4eb6bc69a382dd149200e1d5924412b1d08b4443f6c1ce526f"},
{file = "SQLAlchemy-1.4.19-cp39-cp39-win32.whl", hash = "sha256:fa05a77662c23226c9ec031638fd90ae767009e05cd092b948740f09d10645f0"},
{file = "SQLAlchemy-1.4.19-cp39-cp39-win_amd64.whl", hash = "sha256:d7b21a4b62921cf6dca97e8f9dea1fbe2432aebbb09895a2bd4f527105af41a4"},
{file = "SQLAlchemy-1.4.19.tar.gz", hash = "sha256:89a5a13dcf33b7e47c7a9404a297c836965a247c7f076a0fe0910cae2bee5ce2"},
]
toml = [
{file = "toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b"},

View File

@ -8,24 +8,25 @@ authors = ["Tim Van Baak <tim.vanbaak@gmail.com>"]
python = "^3.8"
Flask = "^2.0.1"
Flask-Login = "^0.5.0"
Flask-WTF = "^0.14.3"
Flask-WTF = "^0.15.1"
SQLAlchemy = "^1.4.12"
[tool.poetry.dev-dependencies]
pytest = "^5.2"
black = "^21.5b2"
mypy = "^0.812"
bs4 = "^0.0.1"
[tool.poetry.scripts]
amanuensis-cli = "amanuensis.cli:main"
amanuensis-server = "amanuensis.server:run"
[tool.black]
extend-exclude = "^/amanuensis/lexicon/.*|^/amanuensis/models/.*|^/amanuensis/resources/.*|^/amanuensis/server/.*|^/amanuensis/user/.*|^/amanuensis/__main__.py"
extend-exclude = "^/amanuensis/lexicon/.*|^/amanuensis/server/[^/]*py|^/amanuensis/server/lexicon/.*|^/amanuensis/server/session/.*|"
[tool.mypy]
ignore_missing_imports = true
exclude = "amanuensis/cli/.*|amanuensis/config/.*|amanuensis/lexicon/.*|amanuensis/log/.*|amanuensis/models/.*|amanuensis/resources/.*|amanuensis/server/.*|amanuensis/user/.*|amanuensis/__main__.py"
exclude = "|amanuensis/lexicon/.*|amanuensis/server/.*|amanuensis/server/lexicon/.*|amanuensis/server/session/.*|"
[tool.pytest.ini_options]
addopts = "--show-capture=log"

View File

@ -1,5 +1,5 @@
[pytest]
addopts = --show-capture=log
addopts = --show-capture=stdout
; pytest should be able to read the pyproject.toml file, but for some reason it
; doesn't seem to be working here. This file is a temporary fix until that gets
; resolved.

View File

@ -1,10 +1,9 @@
from amanuensis.db.models import Lexicon
import datetime
import pytest
from amanuensis.db import DbContext
import amanuensis.backend.lexicon as lexiq
from amanuensis.db import DbContext, Lexicon, User
from amanuensis.errors import ArgumentError
@ -51,3 +50,50 @@ def test_create_lexicon(db: DbContext):
# No duplicate lexicon names
with pytest.raises(ArgumentError):
lexiq.create(**defaults)
def test_lexicon_from(db: DbContext, make):
"""Test lexiq.from_*."""
lexicon1: Lexicon = make.lexicon()
lexicon2: Lexicon = make.lexicon()
assert lexiq.from_name(db, lexicon1.name) == lexicon1
assert lexiq.from_name(db, lexicon2.name) == lexicon2
def test_get_lexicon(db: DbContext, make):
"""Test the various scoped get functions."""
user: User = make.user()
public_joined: Lexicon = make.lexicon()
public_joined.public = True
make.membership(user_id=user.id, lexicon_id=public_joined.id)
private_joined: Lexicon = make.lexicon()
private_joined.public = False
make.membership(user_id=user.id, lexicon_id=private_joined.id)
public_open: Lexicon = make.lexicon()
public_open.public = True
db.session.commit()
private_open: Lexicon = make.lexicon()
private_open.public = False
db.session.commit()
get_all = list(lexiq.get_all(db))
assert public_joined in get_all
assert private_joined in get_all
assert public_open in get_all
assert private_open in get_all
get_joined = list(lexiq.get_joined(db, user.id))
assert public_joined in get_joined
assert private_joined in get_joined
assert public_open not in get_joined
assert private_open not in get_joined
get_public = list(lexiq.get_public(db))
assert public_joined in get_public
assert private_joined not in get_public
assert public_open in get_public
assert private_open not in get_public

View File

@ -1,8 +1,9 @@
from amanuensis.db.models import User
import os
import pytest
from amanuensis.db import DbContext
import amanuensis.backend.user as userq
from amanuensis.db import DbContext, User
from amanuensis.errors import ArgumentError
@ -50,3 +51,26 @@ def test_create_user(db: DbContext):
user2_kw: dict = {**defaults, "username": "user2", "display_name": None}
user2: User = userq.create(**user2_kw)
assert user2.display_name is not None
def test_user_from(db: DbContext, make):
"""Test userq.from_*."""
user1: User = make.user()
user2: User = make.user()
assert userq.from_id(db, user1.id) == user1
assert userq.from_username(db, user1.username) == user1
assert userq.from_id(db, user2.id) == user2
assert userq.from_username(db, user2.username) == user2
def test_user_password(db: DbContext, make):
"""Test user password functions."""
pw1 = os.urandom(8).hex()
pw2 = os.urandom(8).hex()
user: User = make.user(password=pw1)
assert userq.password_check(db, user.username, pw1)
assert not userq.password_check(db, user.username, pw2)
userq.password_set(db, user.username, pw2)
assert not userq.password_check(db, user.username, pw1)
assert userq.password_check(db, user.username, pw2)

View File

@ -1,31 +1,88 @@
"""
pytest test fixtures
"""
import os
import pytest
import tempfile
from typing import Optional
from bs4 import BeautifulSoup
from flask.testing import FlaskClient
from sqlalchemy.orm.session import close_all_sessions
from amanuensis.db import DbContext
import amanuensis.backend.character as charq
import amanuensis.backend.lexicon as lexiq
import amanuensis.backend.membership as memq
import amanuensis.backend.user as userq
from amanuensis.config import AmanuensisConfig
from amanuensis.db import DbContext, User, Lexicon, Membership, Character
from amanuensis.server import get_app
@pytest.fixture
def db() -> DbContext:
"""Provides an initialized database in memory."""
db = DbContext("sqlite:///:memory:", debug=False)
def db(request) -> DbContext:
"""Provides a fully-initialized ephemeral database."""
db_fd, db_path = tempfile.mkstemp()
db = DbContext(path=db_path, echo=False)
db.create_all()
def db_teardown():
close_all_sessions()
os.close(db_fd)
os.unlink(db_path)
request.addfinalizer(db_teardown)
return db
@pytest.fixture
def make_user(db: DbContext):
"""Provides a factory function for creating users, with valid default values."""
class UserClient:
"""Class encapsulating user web operations."""
def user_factory(state={"nonce": 1}, **kwargs):
default_kwargs = {
def __init__(self, db: DbContext, user_id: int):
self.db = db
self.user_id = user_id
def login(self, client: FlaskClient):
"""Log the user in."""
user: Optional[User] = userq.from_id(self.db, self.user_id)
assert user is not None
# Set the user's password so we know what it is later
password = os.urandom(8).hex()
userq.password_set(self.db, user.username, password)
# Log in
response = client.get("/auth/login/")
assert response.status_code == 200
soup = BeautifulSoup(response.data, features="html.parser")
csrf_token = soup.find(id="csrf_token")
assert csrf_token is not None
response = client.post(
"/auth/login/",
data={
"username": user.username,
"password": password,
"csrf_token": csrf_token["value"],
},
)
assert 300 <= response.status_code <= 399
def logout(self, client: FlaskClient):
"""Log the user out."""
response = client.get("/auth/logout/")
assert 300 <= response.status_code <= 399
class ObjectFactory:
"""Factory class."""
def __init__(self, db):
self.db = db
def user(self, state={"nonce": 1}, **kwargs) -> User:
"""Factory function for creating users, with valid default values."""
default_kwargs: dict = {
"username": f'test_user_{state["nonce"]}',
"password": "password",
"display_name": None,
@ -33,87 +90,54 @@ def make_user(db: DbContext):
"is_site_admin": False,
}
state["nonce"] += 1
updated_kwargs = {**default_kwargs, **kwargs}
return userq.create(db, **updated_kwargs)
updated_kwargs: dict = {**default_kwargs, **kwargs}
return userq.create(self.db, **updated_kwargs)
return user_factory
@pytest.fixture
def make_lexicon(db: DbContext):
"""Provides a factory function for creating lexicons, with valid default values."""
def lexicon_factory(state={"nonce": 1}, **kwargs):
default_kwargs = {
def lexicon(self, state={"nonce": 1}, **kwargs) -> Lexicon:
"""Factory function for creating lexicons, with valid default values."""
default_kwargs: dict = {
"name": f'Test_{state["nonce"]}',
"title": None,
"prompt": f'Test Lexicon game {state["nonce"]}',
}
state["nonce"] += 1
updated_kwargs = {**default_kwargs, **kwargs}
lex = lexiq.create(db, **updated_kwargs)
updated_kwargs: dict = {**default_kwargs, **kwargs}
lex = lexiq.create(self.db, **updated_kwargs)
lex.joinable = True
db.session.commit()
self.db.session.commit()
return lex
return lexicon_factory
@pytest.fixture
def make_membership(db: DbContext):
"""Provides a factory function for creating memberships, with valid default values."""
def membership_factory(**kwargs):
default_kwargs = {
def membership(self, **kwargs) -> Membership:
"""Factory function for creating memberships, with valid default values."""
default_kwargs: dict = {
"is_editor": False,
}
updated_kwargs = {**default_kwargs, **kwargs}
return memq.create(db, **updated_kwargs)
updated_kwargs: dict = {**default_kwargs, **kwargs}
return memq.create(self.db, **updated_kwargs)
return membership_factory
@pytest.fixture
def make_character(db: DbContext):
"""Provides a factory function for creating characters, with valid default values."""
def character_factory(state={"nonce": 1}, **kwargs):
default_kwargs = {
def character(self, state={"nonce": 1}, **kwargs) -> Character:
"""Factory function for creating characters, with valid default values."""
default_kwargs: dict = {
"name": f'Character {state["nonce"]}',
"signature": None,
}
state["nonce"] += 1
updated_kwargs = {**default_kwargs, **kwargs}
return charq.create(db, **updated_kwargs)
updated_kwargs: dict = {**default_kwargs, **kwargs}
return charq.create(self.db, **updated_kwargs)
return character_factory
class TestFactory:
def __init__(self, db, **factories):
self.db = db
self.factories = factories
def __getattr__(self, name):
return self.factories[name]
def client(self, user_id: int) -> UserClient:
"""Factory function for user test clients."""
return UserClient(self.db, user_id)
@pytest.fixture
def make(
db: DbContext, make_user, make_lexicon, make_membership, make_character
) -> TestFactory:
"""Fixture that groups all factory fixtures together."""
return TestFactory(
db,
user=make_user,
lexicon=make_lexicon,
membership=make_membership,
character=make_character,
)
def make(db: DbContext) -> ObjectFactory:
"""Fixture that provides a factory class."""
return ObjectFactory(db)
@pytest.fixture
def lexicon_with_editor(make):
def lexicon_with_editor(make: ObjectFactory):
"""Shortcut setup for a lexicon game with an editor."""
editor = make.user()
assert editor
@ -128,12 +152,10 @@ def lexicon_with_editor(make):
class TestConfig(AmanuensisConfig):
TESTING = True
SECRET_KEY = "secret key"
DATABASE_URI = "sqlite:///:memory:"
SECRET_KEY = os.urandom(32).hex()
@pytest.fixture
def app(db):
def app(db: DbContext):
"""Provides an application running on top of the test database."""
server_app = get_app(TestConfig, db)
return server_app
return get_app(TestConfig(), db)

56
tests/test_auth.py Normal file
View File

@ -0,0 +1,56 @@
import os
from urllib.parse import urlsplit
from bs4 import BeautifulSoup
from flask import Flask
from amanuensis.db import User
def test_auth_circuit(app: Flask, make):
"""Test the user login/logout path."""
username: str = f"user_{os.urandom(8).hex()}"
ub: bytes = username.encode("utf8")
assert make.user(username=username, password=username)
with app.test_client() as client:
# User should not be logged in
response = client.get("/home/")
assert response.status_code == 200
assert ub not in response.data
# The login page exists
response = client.get("/auth/login/")
assert response.status_code == 200
assert ub not in response.data
assert b"Username" in response.data
assert b"Username" in response.data
assert b"csrf_token" in response.data
# Get the csrf token for logging in
soup = BeautifulSoup(response.data, features="html.parser")
csrf_token = soup.find(id="csrf_token")["value"]
assert csrf_token
# Log the user in
response = client.post(
"/auth/login/",
data={"username": username, "password": username, "csrf_token": csrf_token},
)
assert 300 <= response.status_code <= 399
assert urlsplit(response.location).path == "/home/"
# Confirm that the user is logged in
response = client.get("/home/")
assert response.status_code == 200
assert ub in response.data
# Log the user out
response = client.get("/auth/logout/")
assert 300 <= response.status_code <= 399
assert urlsplit(response.location).path == "/home/"
# Confirm the user is logged out
response = client.get("/home/")
assert response.status_code == 200
assert ub not in response.data

45
tests/test_home.py Normal file
View File

@ -0,0 +1,45 @@
import os
from urllib.parse import urlsplit
from flask import Flask
from amanuensis.db import DbContext, User, Lexicon
from .conftest import ObjectFactory, UserClient
def test_game_visibility(db: DbContext, app: Flask, make: ObjectFactory):
"""Test lexicon visibility settings."""
user: User = make.user()
auth: UserClient = make.client(user.id)
public_joined: Lexicon = make.lexicon()
public_joined.public = True
make.membership(user_id=auth.user_id, lexicon_id=public_joined.id)
public_joined_title = public_joined.full_title
private_joined: Lexicon = make.lexicon()
private_joined.public = False
make.membership(user_id=auth.user_id, lexicon_id=private_joined.id)
private_joined_title = private_joined.full_title
public_open: Lexicon = make.lexicon()
public_open.public = True
db.session.commit()
public_open_title = public_open.full_title
private_open: Lexicon = make.lexicon()
private_open.public = False
db.session.commit()
private_open_title = private_open.full_title
with app.test_client() as client:
auth.login(client)
# Check that lexicons appear if they should
response = client.get("/home/")
assert response.status_code == 200
assert public_joined_title.encode("utf8") in response.data
assert private_joined_title.encode("utf8") in response.data
assert public_open_title.encode("utf8") in response.data
assert private_open_title.encode("utf8") not in response.data