Compare commits
6 Commits
b13d89a2b0
...
2077feac62
Author | SHA1 | Date |
---|---|---|
Tim Van Baak | 2077feac62 | |
Tim Van Baak | 6085897c23 | |
Tim Van Baak | 7645c85c9d | |
Tim Van Baak | d6f558a92b | |
Tim Van Baak | 74fe79dbf9 | |
Tim Van Baak | 06d662982c |
|
@ -0,0 +1,9 @@
|
||||||
|
import amanuensis.backend.article as artiq
|
||||||
|
import amanuensis.backend.character as charq
|
||||||
|
import amanuensis.backend.index as indq
|
||||||
|
import amanuensis.backend.lexicon as lexiq
|
||||||
|
import amanuensis.backend.membership as memq
|
||||||
|
import amanuensis.backend.post as postq
|
||||||
|
import amanuensis.backend.user as userq
|
||||||
|
|
||||||
|
__all__ = ["artiq", "charq", "indq", "lexiq", "memq", "postq", "userq"]
|
|
@ -7,7 +7,7 @@ from typing import Optional
|
||||||
from sqlalchemy import select
|
from sqlalchemy import select
|
||||||
|
|
||||||
from amanuensis.db import *
|
from amanuensis.db import *
|
||||||
from amanuensis.errors import ArgumentError
|
from amanuensis.errors import ArgumentError, BackendArgumentTypeError
|
||||||
|
|
||||||
|
|
||||||
def create(
|
def create(
|
||||||
|
@ -21,11 +21,11 @@ def create(
|
||||||
"""
|
"""
|
||||||
# Verify argument types are correct
|
# Verify argument types are correct
|
||||||
if not isinstance(lexicon_id, int):
|
if not isinstance(lexicon_id, int):
|
||||||
raise ArgumentError("lexicon_id")
|
raise BackendArgumentTypeError(int, lexicon_id=lexicon_id)
|
||||||
if not isinstance(user_id, int):
|
if not isinstance(user_id, int):
|
||||||
raise ArgumentError("user_id")
|
raise BackendArgumentTypeError(int, user_id=user_id)
|
||||||
if character_id is not None and not isinstance(character_id, int):
|
if character_id is not None and not isinstance(character_id, int):
|
||||||
raise ArgumentError("character_id")
|
raise BackendArgumentTypeError(int, character_id=character_id)
|
||||||
|
|
||||||
# Check that the user is a member of this lexicon
|
# Check that the user is a member of this lexicon
|
||||||
mem: Membership = db(
|
mem: Membership = db(
|
||||||
|
|
|
@ -7,7 +7,7 @@ from typing import Optional
|
||||||
from sqlalchemy import select, func
|
from sqlalchemy import select, func
|
||||||
|
|
||||||
from amanuensis.db import *
|
from amanuensis.db import *
|
||||||
from amanuensis.errors import ArgumentError
|
from amanuensis.errors import ArgumentError, BackendArgumentTypeError
|
||||||
|
|
||||||
|
|
||||||
def create(
|
def create(
|
||||||
|
@ -22,13 +22,13 @@ def create(
|
||||||
"""
|
"""
|
||||||
# Verify argument types are correct
|
# Verify argument types are correct
|
||||||
if not isinstance(lexicon_id, int):
|
if not isinstance(lexicon_id, int):
|
||||||
raise ArgumentError("lexicon_id")
|
raise BackendArgumentTypeError(int, lexicon_id=lexicon_id)
|
||||||
if not isinstance(user_id, int):
|
if not isinstance(user_id, int):
|
||||||
raise ArgumentError("user_id")
|
raise BackendArgumentTypeError(int, user_id=user_id)
|
||||||
if not isinstance(name, str):
|
if not isinstance(name, str):
|
||||||
raise ArgumentError("name")
|
raise BackendArgumentTypeError(str, name=name)
|
||||||
if signature is not None and not isinstance(signature, str):
|
if signature is not None and not isinstance(signature, str):
|
||||||
raise ArgumentError("signature")
|
raise BackendArgumentTypeError(str, signature=signature)
|
||||||
|
|
||||||
# Verify character name is valid
|
# Verify character name is valid
|
||||||
if not name.strip():
|
if not name.strip():
|
||||||
|
|
|
@ -6,7 +6,7 @@ import re
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from amanuensis.db import DbContext, ArticleIndex, IndexType
|
from amanuensis.db import DbContext, ArticleIndex, IndexType
|
||||||
from amanuensis.errors import ArgumentError
|
from amanuensis.errors import ArgumentError, BackendArgumentTypeError
|
||||||
|
|
||||||
|
|
||||||
def create(
|
def create(
|
||||||
|
@ -23,17 +23,17 @@ def create(
|
||||||
"""
|
"""
|
||||||
# Verify argument types are correct
|
# Verify argument types are correct
|
||||||
if not isinstance(lexicon_id, int):
|
if not isinstance(lexicon_id, int):
|
||||||
raise ArgumentError("lexicon_id")
|
raise BackendArgumentTypeError(int, lexicon_id=lexicon_id)
|
||||||
if not isinstance(index_type, IndexType):
|
if not isinstance(index_type, IndexType):
|
||||||
raise ArgumentError("index_type")
|
raise BackendArgumentTypeError(IndexType, index_type=index_type)
|
||||||
if not isinstance(pattern, str):
|
if not isinstance(pattern, str):
|
||||||
raise ArgumentError("pattern")
|
raise BackendArgumentTypeError(str, pattern=pattern)
|
||||||
if not isinstance(logical_order, int):
|
if not isinstance(logical_order, int):
|
||||||
raise ArgumentError("logical_order")
|
raise BackendArgumentTypeError(int, logical_order=logical_order)
|
||||||
if not isinstance(display_order, int):
|
if not isinstance(display_order, int):
|
||||||
raise ArgumentError("display_order")
|
raise BackendArgumentTypeError(int, display_order=display_order)
|
||||||
if capacity is not None and not isinstance(capacity, int):
|
if capacity is not None and not isinstance(capacity, int):
|
||||||
raise ArgumentError("capacity")
|
raise BackendArgumentTypeError(int, capacity=capacity)
|
||||||
|
|
||||||
# Verify the pattern is valid for the index type:
|
# Verify the pattern is valid for the index type:
|
||||||
if index_type == IndexType.CHAR:
|
if index_type == IndexType.CHAR:
|
||||||
|
|
|
@ -5,10 +5,11 @@ Lexicon query interface
|
||||||
import re
|
import re
|
||||||
from typing import Sequence, Optional
|
from typing import Sequence, Optional
|
||||||
|
|
||||||
from sqlalchemy import select, func
|
from sqlalchemy import select, func, update
|
||||||
|
from werkzeug.security import generate_password_hash, check_password_hash
|
||||||
|
|
||||||
from amanuensis.db import DbContext, Lexicon, Membership
|
from amanuensis.db import DbContext, Lexicon, Membership
|
||||||
from amanuensis.errors import ArgumentError
|
from amanuensis.errors import ArgumentError, BackendArgumentTypeError
|
||||||
|
|
||||||
|
|
||||||
RE_ALPHANUM_DASH_UNDER = re.compile(r"^[A-Za-z0-9-_]*$")
|
RE_ALPHANUM_DASH_UNDER = re.compile(r"^[A-Za-z0-9-_]*$")
|
||||||
|
@ -25,7 +26,7 @@ def create(
|
||||||
"""
|
"""
|
||||||
# Verify name
|
# Verify name
|
||||||
if not isinstance(name, str):
|
if not isinstance(name, str):
|
||||||
raise ArgumentError("Lexicon name must be a string")
|
raise BackendArgumentTypeError(str, name=name)
|
||||||
if not name.strip():
|
if not name.strip():
|
||||||
raise ArgumentError("Lexicon name must not be blank")
|
raise ArgumentError("Lexicon name must not be blank")
|
||||||
if not RE_ALPHANUM_DASH_UNDER.match(name):
|
if not RE_ALPHANUM_DASH_UNDER.match(name):
|
||||||
|
@ -34,12 +35,12 @@ def create(
|
||||||
)
|
)
|
||||||
|
|
||||||
# Verify title
|
# Verify title
|
||||||
if title is not None and not isinstance(name, str):
|
if title is not None and not isinstance(title, str):
|
||||||
raise ArgumentError("Lexicon name must be a string")
|
raise BackendArgumentTypeError(str, title=title)
|
||||||
|
|
||||||
# Verify prompt
|
# Verify prompt
|
||||||
if not isinstance(prompt, str):
|
if not isinstance(prompt, str):
|
||||||
raise ArgumentError("Lexicon prompt must be a string")
|
raise BackendArgumentTypeError(str, prompt=prompt)
|
||||||
|
|
||||||
# Query the db to make sure the lexicon name isn't taken
|
# Query the db to make sure the lexicon name isn't taken
|
||||||
if db(select(func.count(Lexicon.id)).where(Lexicon.name == name)).scalar() > 0:
|
if db(select(func.count(Lexicon.id)).where(Lexicon.name == name)).scalar() > 0:
|
||||||
|
@ -55,11 +56,6 @@ def create(
|
||||||
return new_lexicon
|
return new_lexicon
|
||||||
|
|
||||||
|
|
||||||
def from_name(db: DbContext, name: str) -> Lexicon:
|
|
||||||
"""Get a lexicon by its name."""
|
|
||||||
return db(select(Lexicon).where(Lexicon.name == name)).scalar_one()
|
|
||||||
|
|
||||||
|
|
||||||
def get_all(db: DbContext) -> Sequence[Lexicon]:
|
def get_all(db: DbContext) -> Sequence[Lexicon]:
|
||||||
"""Get all lexicons."""
|
"""Get all lexicons."""
|
||||||
return db(select(Lexicon)).scalars()
|
return db(select(Lexicon)).scalars()
|
||||||
|
@ -75,3 +71,23 @@ def get_joined(db: DbContext, user_id: int) -> Sequence[Lexicon]:
|
||||||
def get_public(db: DbContext) -> Sequence[Lexicon]:
|
def get_public(db: DbContext) -> Sequence[Lexicon]:
|
||||||
"""Get all publicly visible lexicons."""
|
"""Get all publicly visible lexicons."""
|
||||||
return db(select(Lexicon).where(Lexicon.public == True)).scalars()
|
return db(select(Lexicon).where(Lexicon.public == True)).scalars()
|
||||||
|
|
||||||
|
|
||||||
|
def password_check(db: DbContext, lexicon_id: int, password: str) -> bool:
|
||||||
|
"""Check if a password is correct."""
|
||||||
|
password_hash: str = db(
|
||||||
|
select(Lexicon.join_password).where(Lexicon.id == lexicon_id)
|
||||||
|
).scalar_one()
|
||||||
|
return check_password_hash(password_hash, password)
|
||||||
|
|
||||||
|
|
||||||
|
def password_set(db: DbContext, lexicon_id: int, new_password: Optional[str]) -> None:
|
||||||
|
"""Set or clear a lexicon's password."""
|
||||||
|
password_hash = generate_password_hash(new_password) if new_password else None
|
||||||
|
db(update(Lexicon).where(Lexicon.id == lexicon_id).values(password=password_hash))
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
|
|
||||||
|
def try_from_name(db: DbContext, name: str) -> Optional[Lexicon]:
|
||||||
|
"""Get a lexicon by its name, or None if no such lexicon was found."""
|
||||||
|
return db(select(Lexicon).where(Lexicon.name == name)).scalar_one_or_none()
|
||||||
|
|
|
@ -6,7 +6,7 @@ from sqlalchemy import select, func
|
||||||
|
|
||||||
from amanuensis.db import DbContext, Membership
|
from amanuensis.db import DbContext, Membership
|
||||||
from amanuensis.db.models import Lexicon
|
from amanuensis.db.models import Lexicon
|
||||||
from amanuensis.errors import ArgumentError
|
from amanuensis.errors import ArgumentError, BackendArgumentTypeError
|
||||||
|
|
||||||
|
|
||||||
def create(
|
def create(
|
||||||
|
@ -20,11 +20,11 @@ def create(
|
||||||
"""
|
"""
|
||||||
# Verify argument types are correct
|
# Verify argument types are correct
|
||||||
if not isinstance(user_id, int):
|
if not isinstance(user_id, int):
|
||||||
raise ArgumentError("user_id")
|
raise BackendArgumentTypeError(int, user_id=user_id)
|
||||||
if not isinstance(lexicon_id, int):
|
if not isinstance(lexicon_id, int):
|
||||||
raise ArgumentError("lexicon_id")
|
raise BackendArgumentTypeError(int, lexicon_id=lexicon_id)
|
||||||
if not isinstance(is_editor, bool):
|
if not isinstance(is_editor, bool):
|
||||||
raise ArgumentError("is_editor")
|
raise BackendArgumentTypeError(bool, is_editor=is_editor)
|
||||||
|
|
||||||
# Verify user has not already joined lexicon
|
# Verify user has not already joined lexicon
|
||||||
if (
|
if (
|
||||||
|
@ -64,3 +64,12 @@ def create(
|
||||||
db.session.add(new_membership)
|
db.session.add(new_membership)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
return new_membership
|
return new_membership
|
||||||
|
|
||||||
|
|
||||||
|
def try_from_ids(db: DbContext, user_id: int, lexicon_id: int) -> Membership:
|
||||||
|
"""Get a membership by the user and lexicon ids, or None if no such membership was found."""
|
||||||
|
return db(
|
||||||
|
select(Membership)
|
||||||
|
.where(Membership.user_id == user_id)
|
||||||
|
.where(Membership.lexicon_id == lexicon_id)
|
||||||
|
).scalar_one_or_none()
|
||||||
|
|
|
@ -8,7 +8,7 @@ from sqlalchemy import select
|
||||||
|
|
||||||
from amanuensis.db import DbContext, Post
|
from amanuensis.db import DbContext, Post
|
||||||
from amanuensis.db.models import Lexicon
|
from amanuensis.db.models import Lexicon
|
||||||
from amanuensis.errors import ArgumentError
|
from amanuensis.errors import ArgumentError, BackendArgumentTypeError
|
||||||
|
|
||||||
|
|
||||||
def create(
|
def create(
|
||||||
|
@ -23,15 +23,15 @@ def create(
|
||||||
|
|
||||||
# Verify lexicon id
|
# Verify lexicon id
|
||||||
if not isinstance(lexicon_id, int):
|
if not isinstance(lexicon_id, int):
|
||||||
raise ArgumentError("Lexicon id must be an integer.")
|
raise BackendArgumentTypeError(int, lexicon_id=lexicon_id)
|
||||||
|
|
||||||
# Verify user_id
|
# Verify user_id
|
||||||
if not (isinstance(user_id, int) or user_id is None):
|
if user_id is not None and not isinstance(user_id, int):
|
||||||
raise ArgumentError("User id must be an integer.")
|
raise BackendArgumentTypeError(int, user_id=user_id)
|
||||||
|
|
||||||
# Verify body
|
# Verify body
|
||||||
if not isinstance(body, str):
|
if not isinstance(body, str):
|
||||||
raise ArgumentError("Post body must be a string.")
|
raise BackendArgumentTypeError(str, body=body)
|
||||||
if not body.strip():
|
if not body.strip():
|
||||||
raise ArgumentError("Post body cannot be empty.")
|
raise ArgumentError("Post body cannot be empty.")
|
||||||
|
|
||||||
|
|
|
@ -10,7 +10,7 @@ from sqlalchemy import select, func, update
|
||||||
from werkzeug.security import generate_password_hash, check_password_hash
|
from werkzeug.security import generate_password_hash, check_password_hash
|
||||||
|
|
||||||
from amanuensis.db import DbContext, User
|
from amanuensis.db import DbContext, User
|
||||||
from amanuensis.errors import ArgumentError
|
from amanuensis.errors import ArgumentError, BackendArgumentTypeError
|
||||||
|
|
||||||
|
|
||||||
RE_NO_LETTERS = re.compile(r"^[0-9-_]*$")
|
RE_NO_LETTERS = re.compile(r"^[0-9-_]*$")
|
||||||
|
@ -30,7 +30,7 @@ def create(
|
||||||
"""
|
"""
|
||||||
# Verify username
|
# Verify username
|
||||||
if not isinstance(username, str):
|
if not isinstance(username, str):
|
||||||
raise ArgumentError("Username must be a string")
|
raise BackendArgumentTypeError(str, username=username)
|
||||||
if len(username) < 3 or len(username) > 32:
|
if len(username) < 3 or len(username) > 32:
|
||||||
raise ArgumentError("Username must be between 3 and 32 characters")
|
raise ArgumentError("Username must be between 3 and 32 characters")
|
||||||
if RE_NO_LETTERS.match(username):
|
if RE_NO_LETTERS.match(username):
|
||||||
|
@ -42,18 +42,18 @@ def create(
|
||||||
|
|
||||||
# Verify password
|
# Verify password
|
||||||
if not isinstance(password, str):
|
if not isinstance(password, str):
|
||||||
raise ArgumentError("Password must be a string")
|
raise BackendArgumentTypeError(str, password=password)
|
||||||
|
|
||||||
# Verify display name
|
# Verify display name
|
||||||
if display_name is not None and not isinstance(display_name, str):
|
if display_name is not None and not isinstance(display_name, str):
|
||||||
raise ArgumentError("Display name must be a string")
|
raise BackendArgumentTypeError(str, display_name=display_name)
|
||||||
# If display name is not provided, use the username
|
# If display name is not provided, use the username
|
||||||
if not display_name or not display_name.strip():
|
if not display_name or not display_name.strip():
|
||||||
display_name = username
|
display_name = username
|
||||||
|
|
||||||
# Verify email
|
# Verify email
|
||||||
if not isinstance(email, str):
|
if not isinstance(email, str):
|
||||||
raise ArgumentError("Email must be a string")
|
raise BackendArgumentTypeError(str, email=email)
|
||||||
|
|
||||||
# Query the db to make sure the username isn't taken
|
# Query the db to make sure the username isn't taken
|
||||||
if db(select(func.count(User.id)).where(User.username == username)).scalar() > 0:
|
if db(select(func.count(User.id)).where(User.username == username)).scalar() > 0:
|
||||||
|
@ -71,36 +71,11 @@ def create(
|
||||||
return new_user
|
return new_user
|
||||||
|
|
||||||
|
|
||||||
def from_id(db: DbContext, user_id: int) -> Optional[User]:
|
|
||||||
"""
|
|
||||||
Get a user by the user's id.
|
|
||||||
Returns None if no user was found.
|
|
||||||
"""
|
|
||||||
user: User = db(select(User).where(User.id == user_id)).scalar_one_or_none()
|
|
||||||
return user
|
|
||||||
|
|
||||||
|
|
||||||
def from_username(db: DbContext, username: str) -> Optional[User]:
|
|
||||||
"""
|
|
||||||
Get a user by the user's username.
|
|
||||||
Returns None if no user was found.
|
|
||||||
"""
|
|
||||||
user: User = db(select(User).where(User.username == username)).scalar_one_or_none()
|
|
||||||
return user
|
|
||||||
|
|
||||||
|
|
||||||
def get_all(db: DbContext) -> Sequence[User]:
|
def get_all(db: DbContext) -> Sequence[User]:
|
||||||
"""Get all users."""
|
"""Get all users."""
|
||||||
return db(select(User)).scalars()
|
return db(select(User)).scalars()
|
||||||
|
|
||||||
|
|
||||||
def password_set(db: DbContext, username: str, new_password: str) -> None:
|
|
||||||
"""Set a user's password."""
|
|
||||||
password_hash = generate_password_hash(new_password)
|
|
||||||
db(update(User).where(User.username == username).values(password=password_hash))
|
|
||||||
db.session.commit()
|
|
||||||
|
|
||||||
|
|
||||||
def password_check(db: DbContext, username: str, password: str) -> bool:
|
def password_check(db: DbContext, username: str, password: str) -> bool:
|
||||||
"""Check if a password is correct."""
|
"""Check if a password is correct."""
|
||||||
user_password_hash: str = db(
|
user_password_hash: str = db(
|
||||||
|
@ -109,6 +84,23 @@ def password_check(db: DbContext, username: str, password: str) -> bool:
|
||||||
return check_password_hash(user_password_hash, password)
|
return check_password_hash(user_password_hash, password)
|
||||||
|
|
||||||
|
|
||||||
|
def password_set(db: DbContext, username: str, new_password: str) -> None:
|
||||||
|
"""Set a user's password."""
|
||||||
|
password_hash = generate_password_hash(new_password)
|
||||||
|
db(update(User).where(User.username == username).values(password=password_hash))
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
|
|
||||||
|
def try_from_id(db: DbContext, user_id: int) -> Optional[User]:
|
||||||
|
"""Get a user by the user's id, or None is no such user was found."""
|
||||||
|
return db(select(User).where(User.id == user_id)).scalar_one_or_none()
|
||||||
|
|
||||||
|
|
||||||
|
def try_from_username(db: DbContext, username: str) -> Optional[User]:
|
||||||
|
"""Get a user by the user's username, or None is no such user was found."""
|
||||||
|
return db(select(User).where(User.username == username)).scalar_one_or_none()
|
||||||
|
|
||||||
|
|
||||||
def update_logged_in(db: DbContext, username: str) -> None:
|
def update_logged_in(db: DbContext, username: str) -> None:
|
||||||
"""Bump the value of the last_login column for a user."""
|
"""Bump the value of the last_login column for a user."""
|
||||||
db(
|
db(
|
||||||
|
|
|
@ -2,9 +2,7 @@ import logging
|
||||||
|
|
||||||
from sqlalchemy import update
|
from sqlalchemy import update
|
||||||
|
|
||||||
import amanuensis.backend.lexicon as lexiq
|
from amanuensis.backend import lexiq, memq, userq
|
||||||
import amanuensis.backend.membership as memq
|
|
||||||
import amanuensis.backend.user as userq
|
|
||||||
from amanuensis.db import DbContext, Lexicon
|
from amanuensis.db import DbContext, Lexicon
|
||||||
|
|
||||||
from .helpers import add_argument
|
from .helpers import add_argument
|
||||||
|
@ -24,9 +22,12 @@ def command_add(args) -> int:
|
||||||
Add a user to a lexicon.
|
Add a user to a lexicon.
|
||||||
"""
|
"""
|
||||||
db: DbContext = args.get_db()
|
db: DbContext = args.get_db()
|
||||||
lexicon = lexiq.from_name(db, args.lexicon)
|
lexicon = lexiq.try_from_name(db, args.lexicon)
|
||||||
user = userq.from_username(db, args.user)
|
if not lexicon:
|
||||||
assert user is not None
|
raise ValueError("Lexicon does not exist")
|
||||||
|
user = userq.try_from_username(db, args.user)
|
||||||
|
if not user:
|
||||||
|
raise ValueError("User does not exist")
|
||||||
memq.create(db, user.id, lexicon.id, args.editor)
|
memq.create(db, user.id, lexicon.id, args.editor)
|
||||||
LOG.info(f"Added {args.user} to lexicon {args.lexicon}")
|
LOG.info(f"Added {args.user} to lexicon {args.lexicon}")
|
||||||
return 0
|
return 0
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import logging
|
import logging
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
import amanuensis.backend.user as userq
|
from amanuensis.backend import userq
|
||||||
from amanuensis.db import DbContext, User
|
from amanuensis.db import DbContext, User
|
||||||
|
|
||||||
from .helpers import add_argument
|
from .helpers import add_argument
|
||||||
|
@ -29,7 +29,7 @@ def command_create(args) -> int:
|
||||||
def command_promote(args) -> int:
|
def command_promote(args) -> int:
|
||||||
"""Make a user a site admin."""
|
"""Make a user a site admin."""
|
||||||
db: DbContext = args.get_db()
|
db: DbContext = args.get_db()
|
||||||
user: Optional[User] = userq.from_username(db, args.username)
|
user: Optional[User] = userq.try_from_username(db, args.username)
|
||||||
if user is None:
|
if user is None:
|
||||||
args.parser.error("User not found")
|
args.parser.error("User not found")
|
||||||
return -1
|
return -1
|
||||||
|
@ -46,7 +46,7 @@ def command_promote(args) -> int:
|
||||||
def command_demote(args):
|
def command_demote(args):
|
||||||
"""Revoke a user's site admin status."""
|
"""Revoke a user's site admin status."""
|
||||||
db: DbContext = args.get_db()
|
db: DbContext = args.get_db()
|
||||||
user: Optional[User] = userq.from_username(db, args.username)
|
user: Optional[User] = userq.try_from_username(db, args.username)
|
||||||
if user is None:
|
if user is None:
|
||||||
args.parser.error("User not found")
|
args.parser.error("User not found")
|
||||||
return -1
|
return -1
|
||||||
|
|
|
@ -4,8 +4,21 @@ Submodule of custom exception types
|
||||||
|
|
||||||
|
|
||||||
class AmanuensisError(Exception):
|
class AmanuensisError(Exception):
|
||||||
"""Base class for exceptions in amanuensis"""
|
"""Base class for exceptions in Amanuensis"""
|
||||||
|
|
||||||
|
|
||||||
class ArgumentError(AmanuensisError):
|
class ArgumentError(AmanuensisError):
|
||||||
"""An internal call was made with invalid arguments"""
|
"""An internal call was made with invalid arguments."""
|
||||||
|
|
||||||
|
|
||||||
|
class BackendArgumentTypeError(ArgumentError):
|
||||||
|
"""
|
||||||
|
A call to a backend function was made with a value of an invalid type for the parameter.
|
||||||
|
Specify the invalid parameter and value as a kwarg.
|
||||||
|
"""
|
||||||
|
def __init__(self, obj_type, **kwarg):
|
||||||
|
if not kwarg:
|
||||||
|
raise ValueError("Missing kwarg")
|
||||||
|
param, value = next(iter(kwarg.items()))
|
||||||
|
msg = f"Expected {param} of type {obj_type}, got {type(value)}"
|
||||||
|
super().__init__(msg)
|
||||||
|
|
|
@ -11,58 +11,6 @@ from amanuensis.models import LexiconModel, UserModel
|
||||||
from amanuensis.resources import get_stream
|
from amanuensis.resources import get_stream
|
||||||
|
|
||||||
|
|
||||||
def player_can_join_lexicon(
|
|
||||||
player: UserModel,
|
|
||||||
lexicon: LexiconModel,
|
|
||||||
password: str = None) -> bool:
|
|
||||||
"""
|
|
||||||
Checks whether the given player can join a lexicon
|
|
||||||
"""
|
|
||||||
# Trivial failures
|
|
||||||
if lexicon is None:
|
|
||||||
return False
|
|
||||||
if player is None:
|
|
||||||
return False
|
|
||||||
# Can't join if already in the game
|
|
||||||
if player.uid in lexicon.cfg.join.joined:
|
|
||||||
return False
|
|
||||||
# Can't join if the game is closed
|
|
||||||
if not lexicon.cfg.join.open:
|
|
||||||
return False
|
|
||||||
# Can't join if there's no room left
|
|
||||||
if len(lexicon.cfg.join.joined) >= lexicon.cfg.join.max_players:
|
|
||||||
return False
|
|
||||||
# Can't join if the password doesn't check out
|
|
||||||
if (lexicon.cfg.join.password is not None
|
|
||||||
and lexicon.cfg.join.password != password):
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
def add_player_to_lexicon(
|
|
||||||
player: UserModel,
|
|
||||||
lexicon: LexiconModel) -> None:
|
|
||||||
"""
|
|
||||||
Unconditionally adds a player to a lexicon
|
|
||||||
"""
|
|
||||||
# Verify arguments
|
|
||||||
if lexicon is None:
|
|
||||||
raise ArgumentError(f'Invalid lexicon: {lexicon}')
|
|
||||||
if player is None:
|
|
||||||
raise ArgumentError(f'Invalid player: {player}')
|
|
||||||
|
|
||||||
# Idempotently add player
|
|
||||||
added = False
|
|
||||||
with lexicon.ctx.edit_config() as cfg:
|
|
||||||
if player.uid not in cfg.join.joined:
|
|
||||||
cfg.join.joined.append(player.uid)
|
|
||||||
added = True
|
|
||||||
|
|
||||||
# Log to the lexicon's log
|
|
||||||
if added:
|
|
||||||
lexicon.log('Player "{0.cfg.username}" joined ({0.uid})'.format(player))
|
|
||||||
|
|
||||||
|
|
||||||
def player_can_create_character(
|
def player_can_create_character(
|
||||||
player: UserModel,
|
player: UserModel,
|
||||||
lexicon: LexiconModel,
|
lexicon: LexiconModel,
|
||||||
|
|
|
@ -2,14 +2,32 @@ from datetime import datetime, timezone
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from flask import Flask, g
|
from flask import Flask, g, url_for, redirect
|
||||||
|
|
||||||
import amanuensis.backend.lexicon
|
from amanuensis.backend import lexiq, userq, memq
|
||||||
import amanuensis.backend.user
|
|
||||||
from amanuensis.config import AmanuensisConfig, CommandLineConfig
|
from amanuensis.config import AmanuensisConfig, CommandLineConfig
|
||||||
from amanuensis.db import DbContext
|
from amanuensis.db import DbContext
|
||||||
|
from amanuensis.parser import filesafe_title
|
||||||
import amanuensis.server.auth as auth
|
import amanuensis.server.auth as auth
|
||||||
import amanuensis.server.home as home
|
import amanuensis.server.home as home
|
||||||
|
import amanuensis.server.lexicon as lexicon
|
||||||
|
|
||||||
|
|
||||||
|
def date_format(dt: datetime, formatstr="%Y-%m-%d %H:%M:%S%z") -> str:
|
||||||
|
"""Convert datetime to human-readable string"""
|
||||||
|
if dt is None:
|
||||||
|
return "never"
|
||||||
|
# Cast db time to UTC, then convert to local timezone
|
||||||
|
adjusted = dt.replace(tzinfo=timezone.utc).astimezone()
|
||||||
|
return adjusted.strftime(formatstr)
|
||||||
|
|
||||||
|
|
||||||
|
def article_link(title):
|
||||||
|
"""Get the url for a lexicon by its title"""
|
||||||
|
return url_for(
|
||||||
|
'lexicon.article',
|
||||||
|
name=g.lexicon.name,
|
||||||
|
title=filesafe_title(title))
|
||||||
|
|
||||||
|
|
||||||
def get_app(
|
def get_app(
|
||||||
|
@ -49,20 +67,12 @@ def get_app(
|
||||||
app.teardown_appcontext(db_teardown)
|
app.teardown_appcontext(db_teardown)
|
||||||
|
|
||||||
# Configure jinja options
|
# Configure jinja options
|
||||||
app.jinja_options.update(trim_blocks=True, lstrip_blocks=True)
|
|
||||||
|
|
||||||
def date_format(dt: datetime, formatstr="%Y-%m-%d %H:%M:%S%z") -> str:
|
|
||||||
if dt is None:
|
|
||||||
return "never"
|
|
||||||
# Cast db time to UTC, then convert to local timezone
|
|
||||||
adjusted = dt.replace(tzinfo=timezone.utc).astimezone()
|
|
||||||
return adjusted.strftime(formatstr)
|
|
||||||
|
|
||||||
app.template_filter("date")(date_format)
|
|
||||||
|
|
||||||
def include_backend():
|
def include_backend():
|
||||||
return {"db": db, "lexiq": amanuensis.backend.lexicon, "userq": amanuensis.backend.user}
|
return {"db": db, "lexiq": lexiq, "userq": userq, "memq": memq}
|
||||||
|
|
||||||
|
app.jinja_options.update(trim_blocks=True, lstrip_blocks=True)
|
||||||
|
app.template_filter("date")(date_format)
|
||||||
|
app.template_filter("articlelink")(article_link)
|
||||||
app.context_processor(include_backend)
|
app.context_processor(include_backend)
|
||||||
|
|
||||||
# Set up Flask-Login
|
# Set up Flask-Login
|
||||||
|
@ -71,11 +81,13 @@ def get_app(
|
||||||
# Register blueprints
|
# Register blueprints
|
||||||
app.register_blueprint(auth.bp)
|
app.register_blueprint(auth.bp)
|
||||||
app.register_blueprint(home.bp)
|
app.register_blueprint(home.bp)
|
||||||
|
app.register_blueprint(lexicon.bp)
|
||||||
|
|
||||||
def test():
|
# Add a root redirect
|
||||||
return "Hello, world!"
|
def root():
|
||||||
|
return redirect(url_for("home.home"))
|
||||||
|
|
||||||
app.route("/")(test)
|
app.route("/")(root)
|
||||||
|
|
||||||
return app
|
return app
|
||||||
|
|
||||||
|
|
|
@ -17,7 +17,7 @@ from flask_login import (
|
||||||
LoginManager,
|
LoginManager,
|
||||||
)
|
)
|
||||||
|
|
||||||
import amanuensis.backend.user as userq
|
from amanuensis.backend import userq
|
||||||
from amanuensis.db import User
|
from amanuensis.db import User
|
||||||
|
|
||||||
from .forms import LoginForm
|
from .forms import LoginForm
|
||||||
|
@ -39,7 +39,7 @@ def get_login_manager() -> LoginManager:
|
||||||
user_id = int(user_id_str)
|
user_id = int(user_id_str)
|
||||||
except:
|
except:
|
||||||
return None
|
return None
|
||||||
return userq.from_id(g.db, user_id)
|
return userq.try_from_id(g.db, user_id)
|
||||||
|
|
||||||
login_manager.user_loader(load_user)
|
login_manager.user_loader(load_user)
|
||||||
|
|
||||||
|
@ -58,7 +58,7 @@ def login():
|
||||||
# POST with valid data
|
# POST with valid data
|
||||||
username: str = form.username.data
|
username: str = form.username.data
|
||||||
password: str = form.password.data
|
password: str = form.password.data
|
||||||
user: User = userq.from_username(g.db, username)
|
user: User = userq.try_from_username(g.db, username)
|
||||||
if not user or not userq.password_check(g.db, username, password):
|
if not user or not userq.password_check(g.db, username, password):
|
||||||
# Bad creds
|
# Bad creds
|
||||||
flash("Login not recognized")
|
flash("Login not recognized")
|
||||||
|
|
|
@ -1,62 +1,39 @@
|
||||||
# Standard library imports
|
|
||||||
from datetime import datetime
|
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
# Third party imports
|
from flask import g, flash, redirect, url_for
|
||||||
from flask import g, flash, redirect, url_for, current_app
|
|
||||||
from flask_login import current_user
|
from flask_login import current_user
|
||||||
|
|
||||||
# Module imports
|
from amanuensis.backend import lexiq, memq
|
||||||
from amanuensis.parser import filesafe_title
|
from amanuensis.db import DbContext, Lexicon, User, Membership
|
||||||
from amanuensis.models import ModelFactory, UserModel, LexiconModel
|
|
||||||
|
|
||||||
|
|
||||||
def register_custom_filters(app):
|
|
||||||
"""Adds custom filters to the Flask app"""
|
|
||||||
|
|
||||||
@app.template_filter("user_attr")
|
|
||||||
def get_user_attr(uid, attr):
|
|
||||||
factory: ModelFactory = current_app.config['model_factory']
|
|
||||||
user: UserModel = factory.user(uid)
|
|
||||||
val = getattr(user.cfg, attr)
|
|
||||||
return val
|
|
||||||
|
|
||||||
@app.template_filter("articlelink")
|
|
||||||
def article_link(title):
|
|
||||||
return url_for(
|
|
||||||
'lexicon.article',
|
|
||||||
name=g.lexicon.cfg.name,
|
|
||||||
title=filesafe_title(title))
|
|
||||||
|
|
||||||
@app.context_processor
|
|
||||||
def lexicon_status():
|
|
||||||
return dict(
|
|
||||||
PREGAME=LexiconModel.PREGAME,
|
|
||||||
ONGOING=LexiconModel.ONGOING,
|
|
||||||
COMPLETE=LexiconModel.COMPLETE)
|
|
||||||
|
|
||||||
|
|
||||||
def lexicon_param(route):
|
def lexicon_param(route):
|
||||||
"""Wrapper for loading a route's lexicon"""
|
"""
|
||||||
|
Wrapper for loading a route's lexicon to `g`.
|
||||||
|
This decorator should be applied above any other decorators that reference `g.lexicon`.
|
||||||
|
"""
|
||||||
@wraps(route)
|
@wraps(route)
|
||||||
def with_lexicon(**kwargs):
|
def with_lexicon(*args, **kwargs):
|
||||||
name = kwargs.get('name')
|
db: DbContext = g.db
|
||||||
model_factory: ModelFactory = current_app.config['model_factory']
|
name: str = kwargs.get('name')
|
||||||
g.lexicon = model_factory.lexicon(name)
|
lexicon: Optional[Lexicon] = lexiq.try_from_name(db, name)
|
||||||
if g.lexicon is None:
|
if lexicon is None:
|
||||||
flash(f'Couldn\'t find a lexicon with the name "{name}"')
|
flash(f"Couldn't find a lexicon with the name \"{name}\"")
|
||||||
return redirect(url_for("home.home"))
|
return redirect(url_for("home.home"))
|
||||||
return route(**kwargs)
|
g.lexicon = lexicon
|
||||||
|
return route(*args, **kwargs)
|
||||||
return with_lexicon
|
return with_lexicon
|
||||||
|
|
||||||
|
|
||||||
def admin_required(route):
|
def admin_required(route):
|
||||||
"""
|
"""
|
||||||
Requires the user to be an admin to load this page
|
Restricts a route to users who are site admins.
|
||||||
"""
|
"""
|
||||||
@wraps(route)
|
@wraps(route)
|
||||||
def admin_route(*args, **kwargs):
|
def admin_route(*args, **kwargs):
|
||||||
if not current_user.cfg.is_admin:
|
user: User = current_user
|
||||||
|
if not user.is_site_admin:
|
||||||
flash("You must be an admin to view this page")
|
flash("You must be an admin to view this page")
|
||||||
return redirect(url_for('home.home'))
|
return redirect(url_for('home.home'))
|
||||||
return route(*args, **kwargs)
|
return route(*args, **kwargs)
|
||||||
|
@ -65,28 +42,36 @@ def admin_required(route):
|
||||||
|
|
||||||
def player_required(route):
|
def player_required(route):
|
||||||
"""
|
"""
|
||||||
Requires the user to be a player in the lexicon to load this page
|
Restricts a route to users who are players in the current lexicon.
|
||||||
"""
|
"""
|
||||||
@wraps(route)
|
@wraps(route)
|
||||||
def player_route(*args, **kwargs):
|
def player_route(*args, **kwargs):
|
||||||
if current_user.uid not in g.lexicon.cfg.join.joined:
|
db: DbContext = g.db
|
||||||
|
user: User = current_user
|
||||||
|
lexicon: Lexicon = g.lexicon
|
||||||
|
mem: Optional[Membership] = memq.try_from_ids(db, user.id, lexicon.id)
|
||||||
|
if not mem:
|
||||||
flash("You must be a player to view this page")
|
flash("You must be a player to view this page")
|
||||||
return (redirect(url_for('lexicon.contents', name=g.lexicon.cfg.name))
|
if lexicon.public:
|
||||||
if g.lexicon.cfg.join.public
|
return redirect(url_for('lexicon.contents', name=lexicon.name))
|
||||||
else redirect(url_for('home.home')))
|
else:
|
||||||
|
return redirect(url_for('home.home'))
|
||||||
return route(*args, **kwargs)
|
return route(*args, **kwargs)
|
||||||
return player_route
|
return player_route
|
||||||
|
|
||||||
|
|
||||||
def player_required_if_not_public(route):
|
def player_required_if_not_public(route):
|
||||||
"""
|
"""
|
||||||
Requires the user to be a player in the lexicon to load this page if the
|
Restricts a route to users who are players in the current lexicon if the lexicon is nonpublic.
|
||||||
lexicon has join.public = false
|
|
||||||
"""
|
"""
|
||||||
@wraps(route)
|
@wraps(route)
|
||||||
def player_route(*args, **kwargs):
|
def player_route(*args, **kwargs):
|
||||||
if ((not g.lexicon.cfg.join.public)
|
db: DbContext = g.db
|
||||||
and current_user.uid not in g.lexicon.cfg.join.joined):
|
user: User = current_user
|
||||||
|
lexicon: Lexicon = g.lexicon
|
||||||
|
if not lexicon.public:
|
||||||
|
mem: Optional[Membership] = memq.try_from_ids(db, user.id, lexicon.id)
|
||||||
|
if not mem:
|
||||||
flash("You must be a player to view this page")
|
flash("You must be a player to view this page")
|
||||||
return redirect(url_for('home.home'))
|
return redirect(url_for('home.home'))
|
||||||
return route(*args, **kwargs)
|
return route(*args, **kwargs)
|
||||||
|
@ -95,13 +80,16 @@ def player_required_if_not_public(route):
|
||||||
|
|
||||||
def editor_required(route):
|
def editor_required(route):
|
||||||
"""
|
"""
|
||||||
Requires the user to be the editor of the current lexicon to load this
|
Restricts a route to users who are editors of the current lexicon.
|
||||||
page
|
|
||||||
"""
|
"""
|
||||||
@wraps(route)
|
@wraps(route)
|
||||||
def editor_route(*args, **kwargs):
|
def editor_route(*args, **kwargs):
|
||||||
if current_user.uid != g.lexicon.cfg.editor:
|
db: DbContext = g.db
|
||||||
|
user: User = current_user
|
||||||
|
lexicon: Lexicon = g.lexicon
|
||||||
|
mem: Optional[Membership] = memq.try_from_ids(db, user.id, lexicon.id)
|
||||||
|
if not mem.is_editor:
|
||||||
flash("You must be the editor to view this page")
|
flash("You must be the editor to view this page")
|
||||||
return redirect(url_for('lexicon.contents', name=g.lexicon.cfg.name))
|
return redirect(url_for('lexicon.contents', name=lexicon.name))
|
||||||
return route(*args, **kwargs)
|
return route(*args, **kwargs)
|
||||||
return editor_route
|
return editor_route
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
from flask import Blueprint, render_template, g
|
from flask import Blueprint, render_template, g
|
||||||
|
|
||||||
import amanuensis.backend.user as userq
|
from amanuensis.backend import userq, lexiq
|
||||||
import amanuensis.backend.lexicon as lexiq
|
|
||||||
|
|
||||||
# from .forms import LexiconCreateForm
|
# from .forms import LexiconCreateForm
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
{% extends "page_2col.jinja" %}
|
{% extends "page_2col.jinja" %}
|
||||||
{% set lexicon_title = g.lexicon.title %}
|
{% set lexicon_title = g.lexicon.full_title %}
|
||||||
|
|
||||||
{% block header %}
|
{% block header %}
|
||||||
<h2>{{ lexicon_title }}</h2>
|
<h2>{{ lexicon_title }}</h2>
|
||||||
<p><i>{{ g.lexicon.cfg.prompt }}</i></p>
|
<p><i>{{ g.lexicon.prompt }}</i></p>
|
||||||
{% endblock %}
|
{% endblock %}
|
||||||
|
|
||||||
{% block sb_logo %}{% endblock %}
|
{% block sb_logo %}{% endblock %}
|
||||||
|
@ -11,22 +11,22 @@
|
||||||
{% endblock %}
|
{% endblock %}
|
||||||
{% block sb_contents %}<a
|
{% block sb_contents %}<a
|
||||||
{% if current_page == "contents" %}class="current-page"
|
{% if current_page == "contents" %}class="current-page"
|
||||||
{% else %}href="{{ url_for('lexicon.contents', name=g.lexicon.cfg.name) }}"
|
{% else %}href="{{ url_for('lexicon.contents', name=g.lexicon.name) }}"
|
||||||
{% endif %}>Contents</a>{% endblock %}
|
{% endif %}>Contents</a>{% endblock %}
|
||||||
{% block sb_rules %}<a
|
{% block sb_rules %}<a
|
||||||
{% if current_page == "rules" %}class="current-page"
|
{% if current_page == "rules" %}class="current-page"
|
||||||
{% else %}href="{{ url_for('lexicon.rules', name=g.lexicon.cfg.name) }}"
|
{% else %}href="{{ url_for('lexicon.rules', name=g.lexicon.name) }}"
|
||||||
{% endif %}>Rules</a>{% endblock %}
|
{% endif %}>Rules</a>{% endblock %}
|
||||||
{% block sb_session %}<a
|
{% block sb_session %}<a
|
||||||
{% if current_page == "session" %}class="current-page"
|
{% if current_page == "session" %}class="current-page"
|
||||||
{% else %}href="{{ url_for('session.session', name=g.lexicon.cfg.name) }}"
|
{% else %}href="#{#{ url_for('session.session', name=g.lexicon.name) }#}"
|
||||||
{% endif %}>Session</a>{% endblock %}
|
{% endif %}>Session</a>{% endblock %}
|
||||||
{% block sb_stats %}<a
|
{% block sb_stats %}<a
|
||||||
{% if current_page == "statistics" %}class="current-page"
|
{% if current_page == "statistics" %}class="current-page"
|
||||||
{% else %}href="{{ url_for('lexicon.stats', name=g.lexicon.cfg.name) }}"
|
{% else %}href="{{ url_for('lexicon.stats', name=g.lexicon.name) }}"
|
||||||
{% endif %}>Statistics</a>{% endblock %}
|
{% endif %}>Statistics</a>{% endblock %}
|
||||||
|
|
||||||
{% if current_user.uid in g.lexicon.cfg.join.joined %}
|
{% if current_user.is_authenticated and memq.try_from_ids(g.db, current_user.id, g.lexicon.id) %}
|
||||||
{# self.sb_logo(), #}
|
{# self.sb_logo(), #}
|
||||||
{% set template_sidebar_rows = [
|
{% set template_sidebar_rows = [
|
||||||
self.sb_home(),
|
self.sb_home(),
|
||||||
|
|
|
@ -1,96 +1,82 @@
|
||||||
from flask import (
|
from flask import Blueprint, flash, redirect, url_for, g, render_template, Markup
|
||||||
Blueprint,
|
|
||||||
flash,
|
|
||||||
redirect,
|
|
||||||
url_for,
|
|
||||||
g,
|
|
||||||
render_template,
|
|
||||||
Markup)
|
|
||||||
from flask_login import login_required, current_user
|
from flask_login import login_required, current_user
|
||||||
|
|
||||||
from amanuensis.lexicon import (
|
from amanuensis.backend import lexiq, memq
|
||||||
player_can_join_lexicon,
|
from amanuensis.db import DbContext, Lexicon, User
|
||||||
add_player_to_lexicon,
|
from amanuensis.errors import ArgumentError
|
||||||
sort_by_index_spec)
|
from amanuensis.server.helpers import lexicon_param, player_required_if_not_public
|
||||||
from amanuensis.models import LexiconModel
|
|
||||||
from amanuensis.server.helpers import (
|
|
||||||
lexicon_param,
|
|
||||||
player_required_if_not_public)
|
|
||||||
|
|
||||||
from .forms import LexiconJoinForm
|
from .forms import LexiconJoinForm
|
||||||
|
|
||||||
|
|
||||||
bp_lexicon = Blueprint('lexicon', __name__,
|
bp = Blueprint("lexicon", __name__, url_prefix="/lexicon/<name>", template_folder=".")
|
||||||
url_prefix='/lexicon/<name>',
|
|
||||||
template_folder='.')
|
|
||||||
|
|
||||||
|
|
||||||
@bp_lexicon.route("/join/", methods=['GET', 'POST'])
|
@bp.route("/join/", methods=["GET", "POST"])
|
||||||
@lexicon_param
|
@lexicon_param
|
||||||
@login_required
|
@login_required
|
||||||
def join(name):
|
def join(name):
|
||||||
if g.lexicon.status != LexiconModel.PREGAME:
|
lexicon: Lexicon = g.lexicon
|
||||||
flash("Can't join a game already in progress")
|
if not lexicon.joinable:
|
||||||
return redirect(url_for('home.home'))
|
|
||||||
|
|
||||||
if not g.lexicon.cfg.join.open:
|
|
||||||
flash("This game isn't open for joining")
|
flash("This game isn't open for joining")
|
||||||
return redirect(url_for('home.home'))
|
return redirect(url_for("home.home"))
|
||||||
|
|
||||||
form = LexiconJoinForm()
|
form = LexiconJoinForm()
|
||||||
|
|
||||||
if not form.validate_on_submit():
|
if not form.validate_on_submit():
|
||||||
# GET or POST with invalid form data
|
# GET or POST with invalid form data
|
||||||
return render_template('lexicon.join.jinja', form=form)
|
return render_template("lexicon.join.jinja", form=form)
|
||||||
|
|
||||||
# POST with valid data
|
# POST with valid data
|
||||||
# If the game is passworded, check password
|
# If the game is passworded, check password
|
||||||
if (g.lexicon.cfg.join.password
|
db: DbContext = g.db
|
||||||
and form.password.data != g.lexicon.cfg.join.password):
|
if lexicon.join_password and not lexiq.password_check(
|
||||||
|
db, lexicon.id, form.password.data
|
||||||
|
):
|
||||||
# Bad creds, try again
|
# Bad creds, try again
|
||||||
flash('Incorrect password')
|
flash("Incorrect password")
|
||||||
return redirect(url_for('lexicon.join', name=name))
|
return redirect(url_for("lexicon.join", name=name))
|
||||||
|
|
||||||
# If the password was correct, check if the user can join
|
# If the password was correct, check if the user can join
|
||||||
if player_can_join_lexicon(current_user, g.lexicon, form.password.data):
|
user: User = current_user
|
||||||
add_player_to_lexicon(current_user, g.lexicon)
|
try:
|
||||||
return redirect(url_for('session.session', name=name))
|
memq.create(db, user.id, lexicon.id, is_editor=False)
|
||||||
else:
|
return redirect(url_for("session.session", name=name))
|
||||||
flash('Could not join game')
|
except ArgumentError:
|
||||||
return redirect(url_for('home.home', name=name))
|
flash("Could not join game")
|
||||||
|
return redirect(url_for("home.home", name=name))
|
||||||
|
|
||||||
|
|
||||||
@bp_lexicon.route('/contents/', methods=['GET'])
|
@bp.get("/contents/")
|
||||||
@lexicon_param
|
@lexicon_param
|
||||||
@player_required_if_not_public
|
@player_required_if_not_public
|
||||||
def contents(name):
|
def contents(name):
|
||||||
with g.lexicon.ctx.read('info') as info:
|
# indexed = sort_by_index_spec(info, g.lexicon.cfg.article.index.list)
|
||||||
indexed = sort_by_index_spec(info, g.lexicon.cfg.article.index.list)
|
# for articles in indexed.values():
|
||||||
for articles in indexed.values():
|
# for i in range(len(articles)):
|
||||||
for i in range(len(articles)):
|
# articles[i] = {
|
||||||
articles[i] = {
|
# 'title': articles[i],
|
||||||
'title': articles[i],
|
# **info.get(articles[i])}
|
||||||
**info.get(articles[i])}
|
return render_template("lexicon.contents.jinja")
|
||||||
return render_template('lexicon.contents.jinja', indexed=indexed)
|
|
||||||
|
|
||||||
|
|
||||||
@bp_lexicon.route('/article/<title>')
|
@bp.get("/article/<title>")
|
||||||
@lexicon_param
|
@lexicon_param
|
||||||
@player_required_if_not_public
|
@player_required_if_not_public
|
||||||
def article(name, title):
|
def article(name, title):
|
||||||
with g.lexicon.ctx.article.read(title) as a:
|
# article = {**a, 'html': Markup(a['html'])}
|
||||||
article = {**a, 'html': Markup(a['html'])}
|
return render_template("lexicon.article.jinja")
|
||||||
return render_template('lexicon.article.jinja', article=article)
|
|
||||||
|
|
||||||
|
|
||||||
@bp_lexicon.route('/rules/', methods=['GET'])
|
@bp.get("/rules/")
|
||||||
@lexicon_param
|
@lexicon_param
|
||||||
@player_required_if_not_public
|
@player_required_if_not_public
|
||||||
def rules(name):
|
def rules(name):
|
||||||
return render_template('lexicon.rules.jinja')
|
return render_template("lexicon.rules.jinja")
|
||||||
|
|
||||||
|
|
||||||
@bp_lexicon.route('/statistics/', methods=['GET'])
|
@bp.get("/statistics/")
|
||||||
@lexicon_param
|
@lexicon_param
|
||||||
@player_required_if_not_public
|
@player_required_if_not_public
|
||||||
def stats(name):
|
def stats(name):
|
||||||
return render_template('lexicon.statistics.jinja')
|
return render_template("lexicon.statistics.jinja")
|
||||||
|
|
|
@ -4,5 +4,6 @@ from wtforms import StringField, SubmitField
|
||||||
|
|
||||||
class LexiconJoinForm(FlaskForm):
|
class LexiconJoinForm(FlaskForm):
|
||||||
"""/lexicon/<name>/join/"""
|
"""/lexicon/<name>/join/"""
|
||||||
password = StringField('Password')
|
|
||||||
submit = SubmitField('Submit')
|
password = StringField("Password")
|
||||||
|
submit = SubmitField("Submit")
|
||||||
|
|
|
@ -1,12 +1,14 @@
|
||||||
{% extends "lexicon.jinja" %}
|
{% extends "lexicon.jinja" %}
|
||||||
{% block title %}Join | {{ lexicon_title }}{% endblock %}
|
{% block title %}Join | {{ g.lexicon.full_title }}{% endblock %}
|
||||||
|
|
||||||
{% block main %}
|
{% block main %}
|
||||||
|
|
||||||
<form id="lexicon-join" action="" method="post" novalidate>
|
<form id="lexicon-join" action="" method="post" novalidate>
|
||||||
{{ form.hidden_tag() }}
|
{{ form.hidden_tag() }}
|
||||||
{% if g.lexicon.cfg.join.password %}
|
{% if g.lexicon.join_password %}
|
||||||
<p>{{ form.password.label }}<br>{{ form.password(size=32) }}</p>
|
<p>{{ form.password.label }}<br>{{ form.password(size=32) }}</p>
|
||||||
|
{% else %}
|
||||||
|
<p>Join {{ g.lexicon.full_title }}?</p>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
<p>{{ form.submit() }}</p>
|
<p>{{ form.submit() }}</p>
|
||||||
</form>
|
</form>
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
<div class="dashboard-lexicon-item dashboard-lexicon-{{ status }}">
|
<div class="dashboard-lexicon-item dashboard-lexicon-{{ status }}">
|
||||||
<p>
|
<p>
|
||||||
<span class="dashboard-lexicon-item-title">
|
<span class="dashboard-lexicon-item-title">
|
||||||
<a href="#{#{ url_for('lexicon.contents', name=lexicon.cfg.name) }#}">{{ lexicon.full_title }}</a>
|
<a href="{{ url_for('lexicon.contents', name=lexicon.name) }}">{{ lexicon.full_title }}</a>
|
||||||
</span>
|
</span>
|
||||||
[{{ status.capitalize() }}]
|
[{{ status.capitalize() }}]
|
||||||
</p>
|
</p>
|
||||||
|
@ -29,7 +29,7 @@
|
||||||
Players: {{ lexicon.memberships|count }}{% if lexicon.player_limit is not none %} / {{ lexicon.player_limit }}{% endif -%}
|
Players: {{ lexicon.memberships|count }}{% if lexicon.player_limit is not none %} / {{ lexicon.player_limit }}{% endif -%}
|
||||||
{%-
|
{%-
|
||||||
if lexicon.public and lexicon.joinable
|
if lexicon.public and lexicon.joinable
|
||||||
%} / <a href="#{#{ url_for('lexicon.join', name=lexicon.cfg.name) }#}">Join game</a>
|
%} / <a href="{{ url_for('lexicon.join', name=lexicon.name) }}">Join game</a>
|
||||||
{%- endif -%}
|
{%- endif -%}
|
||||||
{%- endif -%}
|
{%- endif -%}
|
||||||
</p>
|
</p>
|
||||||
|
|
2
mypy.ini
2
mypy.ini
|
@ -1,4 +1,4 @@
|
||||||
[mypy]
|
[mypy]
|
||||||
ignore_missing_imports = true
|
ignore_missing_imports = true
|
||||||
exclude = "|amanuensis/lexicon/.*|amanuensis/server/.*|amanuensis/server/lexicon/.*|amanuensis/server/session/.*|"
|
exclude = "|amanuensis/lexicon/.*|amanuensis/server/.*|amanuensis/server/session/.*|"
|
||||||
; mypy stable doesn't support pyproject.toml yet
|
; mypy stable doesn't support pyproject.toml yet
|
|
@ -22,11 +22,11 @@ amanuensis-cli = "amanuensis.cli:main"
|
||||||
amanuensis-server = "amanuensis.server:run"
|
amanuensis-server = "amanuensis.server:run"
|
||||||
|
|
||||||
[tool.black]
|
[tool.black]
|
||||||
extend-exclude = "^/amanuensis/lexicon/.*|^/amanuensis/server/[^/]*py|^/amanuensis/server/lexicon/.*|^/amanuensis/server/session/.*|"
|
extend-exclude = "^/amanuensis/lexicon/.*|^/amanuensis/server/[^/]*py|^/amanuensis/server/session/.*|"
|
||||||
|
|
||||||
[tool.mypy]
|
[tool.mypy]
|
||||||
ignore_missing_imports = true
|
ignore_missing_imports = true
|
||||||
exclude = "|amanuensis/lexicon/.*|amanuensis/server/.*|amanuensis/server/lexicon/.*|amanuensis/server/session/.*|"
|
exclude = "|amanuensis/lexicon/.*|amanuensis/server/.*|amanuensis/server/session/.*|"
|
||||||
|
|
||||||
[tool.pytest.ini_options]
|
[tool.pytest.ini_options]
|
||||||
addopts = "--show-capture=log"
|
addopts = "--show-capture=log"
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
import pytest
|
import pytest
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from amanuensis.db import DbContext
|
from amanuensis.backend import artiq
|
||||||
from amanuensis.db.models import Character, Lexicon, User
|
from amanuensis.db import DbContext, Character, Lexicon, User
|
||||||
import amanuensis.backend.article as artiq
|
|
||||||
from amanuensis.errors import ArgumentError
|
from amanuensis.errors import ArgumentError
|
||||||
from tests.conftest import ObjectFactory
|
from tests.conftest import ObjectFactory
|
||||||
|
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
from amanuensis.backend import charq
|
||||||
from amanuensis.db import *
|
from amanuensis.db import *
|
||||||
import amanuensis.backend.character as charq
|
from amanuensis.errors import ArgumentError, BackendArgumentTypeError
|
||||||
from amanuensis.errors import ArgumentError
|
|
||||||
|
|
||||||
|
|
||||||
def test_create_character(db: DbContext, lexicon_with_editor, make):
|
def test_create_character(db: DbContext, lexicon_with_editor, make):
|
||||||
|
@ -20,13 +20,13 @@ def test_create_character(db: DbContext, lexicon_with_editor, make):
|
||||||
kwargs: dict
|
kwargs: dict
|
||||||
|
|
||||||
# Bad argument types
|
# Bad argument types
|
||||||
with pytest.raises(ArgumentError):
|
with pytest.raises(BackendArgumentTypeError):
|
||||||
kwargs = {**defaults, "name": b"bytestring"}
|
kwargs = {**defaults, "name": b"bytestring"}
|
||||||
charq.create(**kwargs)
|
charq.create(**kwargs)
|
||||||
with pytest.raises(ArgumentError):
|
with pytest.raises(BackendArgumentTypeError):
|
||||||
kwargs = {**defaults, "name": None}
|
kwargs = {**defaults, "name": None}
|
||||||
charq.create(**kwargs)
|
charq.create(**kwargs)
|
||||||
with pytest.raises(ArgumentError):
|
with pytest.raises(BackendArgumentTypeError):
|
||||||
kwargs = {**defaults, "signature": b"bytestring"}
|
kwargs = {**defaults, "signature": b"bytestring"}
|
||||||
charq.create(**kwargs)
|
charq.create(**kwargs)
|
||||||
|
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
from amanuensis.db.models import IndexType
|
from amanuensis.db.models import IndexType
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
import amanuensis.backend.index as indq
|
from amanuensis.backend import indq
|
||||||
from amanuensis.db import DbContext, Lexicon, User
|
from amanuensis.db import DbContext, Lexicon
|
||||||
|
|
||||||
from amanuensis.errors import ArgumentError
|
from amanuensis.errors import ArgumentError
|
||||||
|
|
||||||
|
|
|
@ -3,9 +3,9 @@ import time
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
import amanuensis.backend.lexicon as lexiq
|
from amanuensis.backend import lexiq
|
||||||
from amanuensis.db import DbContext, Lexicon, User
|
from amanuensis.db import DbContext, Lexicon, User
|
||||||
from amanuensis.errors import ArgumentError
|
from amanuensis.errors import ArgumentError, BackendArgumentTypeError
|
||||||
from tests.conftest import ObjectFactory
|
from tests.conftest import ObjectFactory
|
||||||
|
|
||||||
|
|
||||||
|
@ -20,7 +20,7 @@ def test_create_lexicon(db: DbContext):
|
||||||
kwargs: dict
|
kwargs: dict
|
||||||
|
|
||||||
# Test name constraints
|
# Test name constraints
|
||||||
with pytest.raises(ArgumentError):
|
with pytest.raises(BackendArgumentTypeError):
|
||||||
kwargs = {**defaults, "name": None}
|
kwargs = {**defaults, "name": None}
|
||||||
lexiq.create(**kwargs)
|
lexiq.create(**kwargs)
|
||||||
with pytest.raises(ArgumentError):
|
with pytest.raises(ArgumentError):
|
||||||
|
@ -58,8 +58,8 @@ def test_lexicon_from(db: DbContext, make: ObjectFactory):
|
||||||
"""Test lexiq.from_*."""
|
"""Test lexiq.from_*."""
|
||||||
lexicon1: Lexicon = make.lexicon()
|
lexicon1: Lexicon = make.lexicon()
|
||||||
lexicon2: Lexicon = make.lexicon()
|
lexicon2: Lexicon = make.lexicon()
|
||||||
assert lexiq.from_name(db, lexicon1.name) == lexicon1
|
assert lexiq.try_from_name(db, lexicon1.name) == lexicon1
|
||||||
assert lexiq.from_name(db, lexicon2.name) == lexicon2
|
assert lexiq.try_from_name(db, lexicon2.name) == lexicon2
|
||||||
|
|
||||||
|
|
||||||
def test_get_lexicon(db: DbContext, make: ObjectFactory):
|
def test_get_lexicon(db: DbContext, make: ObjectFactory):
|
||||||
|
|
|
@ -2,9 +2,9 @@ import pytest
|
||||||
|
|
||||||
from sqlalchemy import select
|
from sqlalchemy import select
|
||||||
|
|
||||||
|
from amanuensis.backend import memq
|
||||||
from amanuensis.db import *
|
from amanuensis.db import *
|
||||||
from amanuensis.errors import ArgumentError
|
from amanuensis.errors import ArgumentError
|
||||||
import amanuensis.backend.membership as memq
|
|
||||||
|
|
||||||
|
|
||||||
def test_create_membership(db: DbContext, make):
|
def test_create_membership(db: DbContext, make):
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
from amanuensis.backend import postq
|
||||||
from amanuensis.db import DbContext
|
from amanuensis.db import DbContext
|
||||||
import amanuensis.backend.post as postq
|
|
||||||
|
|
||||||
from amanuensis.errors import ArgumentError
|
from amanuensis.errors import ArgumentError, BackendArgumentTypeError
|
||||||
|
|
||||||
|
|
||||||
def test_create_post(db: DbContext, lexicon_with_editor):
|
def test_create_post(db: DbContext, lexicon_with_editor):
|
||||||
|
@ -20,19 +20,16 @@ def test_create_post(db: DbContext, lexicon_with_editor):
|
||||||
kwargs: dict
|
kwargs: dict
|
||||||
|
|
||||||
# ids are integers
|
# ids are integers
|
||||||
with pytest.raises(ArgumentError):
|
with pytest.raises(BackendArgumentTypeError):
|
||||||
kwargs = {**defaults, "user_id": "zero"}
|
kwargs = {**defaults, "user_id": "zero"}
|
||||||
postq.create(**kwargs)
|
postq.create(**kwargs)
|
||||||
with pytest.raises(ArgumentError):
|
with pytest.raises(BackendArgumentTypeError):
|
||||||
kwargs = {**defaults, "lexicon_id": "zero"}
|
kwargs = {**defaults, "lexicon_id": "zero"}
|
||||||
postq.create(**kwargs)
|
postq.create(**kwargs)
|
||||||
|
|
||||||
# empty arguments don't work
|
# empty arguments don't work
|
||||||
with pytest.raises(ArgumentError):
|
with pytest.raises(BackendArgumentTypeError):
|
||||||
kwargs = {**defaults, "lexicon_id": ""}
|
kwargs = {**defaults, "lexicon_id": None}
|
||||||
postq.create(**kwargs)
|
|
||||||
with pytest.raises(ArgumentError):
|
|
||||||
kwargs = {**defaults, "user_id": ""}
|
|
||||||
postq.create(**kwargs)
|
postq.create(**kwargs)
|
||||||
with pytest.raises(ArgumentError):
|
with pytest.raises(ArgumentError):
|
||||||
kwargs = {**defaults, "body": ""}
|
kwargs = {**defaults, "body": ""}
|
||||||
|
|
|
@ -2,9 +2,9 @@ import os
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
import amanuensis.backend.user as userq
|
from amanuensis.backend import userq
|
||||||
from amanuensis.db import DbContext, User
|
from amanuensis.db import DbContext, User
|
||||||
from amanuensis.errors import ArgumentError
|
from amanuensis.errors import ArgumentError, BackendArgumentTypeError
|
||||||
|
|
||||||
|
|
||||||
def test_create_user(db: DbContext):
|
def test_create_user(db: DbContext):
|
||||||
|
@ -33,7 +33,7 @@ def test_create_user(db: DbContext):
|
||||||
userq.create(**kwargs)
|
userq.create(**kwargs)
|
||||||
|
|
||||||
# No password
|
# No password
|
||||||
with pytest.raises(ArgumentError):
|
with pytest.raises(BackendArgumentTypeError):
|
||||||
kwargs = {**defaults, "password": None}
|
kwargs = {**defaults, "password": None}
|
||||||
userq.create(**kwargs)
|
userq.create(**kwargs)
|
||||||
|
|
||||||
|
@ -57,10 +57,10 @@ def test_user_from(db: DbContext, make):
|
||||||
"""Test userq.from_*."""
|
"""Test userq.from_*."""
|
||||||
user1: User = make.user()
|
user1: User = make.user()
|
||||||
user2: User = make.user()
|
user2: User = make.user()
|
||||||
assert userq.from_id(db, user1.id) == user1
|
assert userq.try_from_id(db, user1.id) == user1
|
||||||
assert userq.from_username(db, user1.username) == user1
|
assert userq.try_from_username(db, user1.username) == user1
|
||||||
assert userq.from_id(db, user2.id) == user2
|
assert userq.try_from_id(db, user2.id) == user2
|
||||||
assert userq.from_username(db, user2.username) == user2
|
assert userq.try_from_username(db, user2.username) == user2
|
||||||
|
|
||||||
|
|
||||||
def test_user_password(db: DbContext, make):
|
def test_user_password(db: DbContext, make):
|
||||||
|
|
|
@ -10,10 +10,7 @@ from bs4 import BeautifulSoup
|
||||||
from flask.testing import FlaskClient
|
from flask.testing import FlaskClient
|
||||||
from sqlalchemy.orm.session import close_all_sessions
|
from sqlalchemy.orm.session import close_all_sessions
|
||||||
|
|
||||||
import amanuensis.backend.character as charq
|
from amanuensis.backend import charq, lexiq, memq, userq
|
||||||
import amanuensis.backend.lexicon as lexiq
|
|
||||||
import amanuensis.backend.membership as memq
|
|
||||||
import amanuensis.backend.user as userq
|
|
||||||
from amanuensis.config import AmanuensisConfig
|
from amanuensis.config import AmanuensisConfig
|
||||||
from amanuensis.db import DbContext, User, Lexicon, Membership, Character
|
from amanuensis.db import DbContext, User, Lexicon, Membership, Character
|
||||||
from amanuensis.server import get_app
|
from amanuensis.server import get_app
|
||||||
|
@ -45,7 +42,7 @@ class UserClient:
|
||||||
|
|
||||||
def login(self, client: FlaskClient):
|
def login(self, client: FlaskClient):
|
||||||
"""Log the user in."""
|
"""Log the user in."""
|
||||||
user: Optional[User] = userq.from_id(self.db, self.user_id)
|
user: Optional[User] = userq.try_from_id(self.db, self.user_id)
|
||||||
assert user is not None
|
assert user is not None
|
||||||
|
|
||||||
# Set the user's password so we know what it is later
|
# Set the user's password so we know what it is later
|
||||||
|
|
Loading…
Reference in New Issue