Compare commits

..

5 Commits

20 changed files with 151 additions and 91 deletions

View File

@ -3,11 +3,11 @@ Lexicon query interface
""" """
import re import re
from typing import Sequence from typing import Sequence, Optional
from sqlalchemy import select, func from sqlalchemy import select, func
from amanuensis.db import DbContext, Lexicon from amanuensis.db import DbContext, Lexicon, Membership
from amanuensis.errors import ArgumentError from amanuensis.errors import ArgumentError
@ -17,7 +17,7 @@ RE_ALPHANUM_DASH_UNDER = re.compile(r"^[A-Za-z0-9-_]*$")
def create( def create(
db: DbContext, db: DbContext,
name: str, name: str,
title: str, title: Optional[str],
prompt: str, prompt: str,
) -> Lexicon: ) -> Lexicon:
""" """
@ -55,6 +55,21 @@ def create(
return new_lexicon return new_lexicon
def get_all_lexicons(db: DbContext) -> Sequence[Lexicon]: def from_name(db: DbContext, name: str) -> Lexicon:
"""Get a lexicon by its name."""
return db(select(Lexicon).where(Lexicon.name == name)).scalar_one()
def get_all(db: DbContext) -> Sequence[Lexicon]:
"""Get all lexicons.""" """Get all lexicons."""
return db(select(Lexicon)).scalars() return db(select(Lexicon)).scalars()
def get_joined(db: DbContext, user_id: int) -> Sequence[Lexicon]:
"""Get all lexicons that a player is in."""
return db(select(Lexicon).join(Lexicon.memberships).where(Membership.user_id == user_id)).scalars()
def get_public(db: DbContext) -> Sequence[Lexicon]:
"""Get all publicly visible lexicons."""
return db(select(Lexicon).where(Lexicon.public == True)).scalars()

View File

@ -21,7 +21,7 @@ def create(
db: DbContext, db: DbContext,
username: str, username: str,
password: str, password: str,
display_name: str, display_name: Optional[str],
email: str, email: str,
is_site_admin: bool, is_site_admin: bool,
) -> User: ) -> User:
@ -71,12 +71,7 @@ def create(
return new_user return new_user
def get_all_users(db: DbContext) -> Sequence[User]: def from_id(db: DbContext, user_id: int) -> Optional[User]:
"""Get all users."""
return db(select(User)).scalars()
def get_user_by_id(db: DbContext, user_id: int) -> Optional[User]:
""" """
Get a user by the user's id. Get a user by the user's id.
Returns None if no user was found. Returns None if no user was found.
@ -85,7 +80,7 @@ def get_user_by_id(db: DbContext, user_id: int) -> Optional[User]:
return user return user
def get_user_by_username(db: DbContext, username: str) -> Optional[User]: def from_username(db: DbContext, username: str) -> Optional[User]:
""" """
Get a user by the user's username. Get a user by the user's username.
Returns None if no user was found. Returns None if no user was found.
@ -94,6 +89,11 @@ def get_user_by_username(db: DbContext, username: str) -> Optional[User]:
return user return user
def get_all(db: DbContext) -> Sequence[User]:
"""Get all users."""
return db(select(User)).scalars()
def password_set(db: DbContext, username: str, new_password: str) -> None: def password_set(db: DbContext, username: str, new_password: str) -> None:
"""Set a user's password.""" """Set a user's password."""
password_hash = generate_password_hash(new_password) password_hash = generate_password_hash(new_password)

View File

@ -1,5 +1,13 @@
from argparse import BooleanOptionalAction
import logging import logging
from sqlalchemy import update
import amanuensis.backend.lexicon as lexiq
import amanuensis.backend.membership as memq
import amanuensis.backend.user as userq
from amanuensis.db import DbContext, Lexicon
from .helpers import add_argument from .helpers import add_argument
@ -9,22 +17,51 @@ COMMAND_HELP = "Interact with lexicons."
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@add_argument("lexicon")
@add_argument("user")
@add_argument("--editor", action="store_true")
def command_add(args) -> int:
db: DbContext = args.get_db()
lexicon = lexiq.from_name(db, args.lexicon)
user = userq.from_username(db, args.user)
assert user is not None
memq.create(db, user.id, lexicon.id, args.editor)
LOG.info(f"Added {args.user} to lexicon {args.lexicon}")
return 0
@add_argument("name")
def command_create(args): def command_create(args):
""" """
Create a lexicon. Create a lexicon.
""" """
raise NotImplementedError() db: DbContext = args.get_db()
lexiq.create(db, args.name, None, f"Prompt for Lexicon {args.name}")
LOG.info(f"Created lexicon {args.name}")
return 0
def command_delete(args): @add_argument("name")
@add_argument("--public", action=BooleanOptionalAction)
@add_argument("--join", action=BooleanOptionalAction)
def command_edit(args):
""" """
Delete a lexicon. Update a lexicon's configuration.
""" """
raise NotImplementedError() db: DbContext = args.get_db()
values = {}
if args.public == True:
values["public"] = True
elif args.public == False:
values["public"] = False
def command_list(args): if args.join == True:
""" values["joinable"] = True
List all lexicons and their statuses. elif args.join == False:
""" values["joinable"] = False
raise NotImplementedError()
result = db(update(Lexicon).where(Lexicon.name == args.name).values(**values))
LOG.info(f"Updated {result.rowcount} lexicons")
db.session.commit()
return 0 if result.rowcount == 1 else -1

View File

@ -21,6 +21,7 @@ def command_create(args) -> int:
db: DbContext = args.get_db() db: DbContext = args.get_db()
userq.create(db, args.username, "password", args.username, args.email, False) userq.create(db, args.username, "password", args.username, args.email, False)
userq.password_set(db, args.username, args.password) userq.password_set(db, args.username, args.password)
LOG.info(f"Created user {args.username}")
return 0 return 0
@ -28,7 +29,7 @@ def command_create(args) -> int:
def command_promote(args) -> int: def command_promote(args) -> int:
"""Make a user a site admin.""" """Make a user a site admin."""
db: DbContext = args.get_db() db: DbContext = args.get_db()
user: Optional[User] = userq.get_user_by_username(db, args.username) user: Optional[User] = userq.from_username(db, args.username)
if user is None: if user is None:
args.parser.error("User not found") args.parser.error("User not found")
return -1 return -1
@ -45,7 +46,7 @@ def command_promote(args) -> int:
def command_demote(args): def command_demote(args):
"""Revoke a user's site admin status.""" """Revoke a user's site admin status."""
db: DbContext = args.get_db() db: DbContext = args.get_db()
user: Optional[User] = userq.get_user_by_username(db, args.username) user: Optional[User] = userq.from_username(db, args.username)
if user is None: if user is None:
args.parser.error("User not found") args.parser.error("User not found")
return -1 return -1

View File

@ -39,10 +39,10 @@ class DbContext:
if path and uri: if path and uri:
raise ValueError("Only one of path and uri may be specified") raise ValueError("Only one of path and uri may be specified")
db_uri = uri if uri else f"sqlite:///{os.path.abspath(path)}" self.db_uri = uri if uri else f"sqlite:///{os.path.abspath(path)}"
# Create an engine and enable foreign key constraints in sqlite # Create an engine and enable foreign key constraints in sqlite
self.engine = create_engine(db_uri, echo=echo) self.engine = create_engine(self.db_uri, echo=echo)
@event.listens_for(self.engine, "connect") @event.listens_for(self.engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record): def set_sqlite_pragma(dbapi_connection, connection_record):

View File

@ -4,6 +4,8 @@ import os
from flask import Flask, g from flask import Flask, g
import amanuensis.backend.lexicon
import amanuensis.backend.user
from amanuensis.config import AmanuensisConfig, CommandLineConfig from amanuensis.config import AmanuensisConfig, CommandLineConfig
from amanuensis.db import DbContext from amanuensis.db import DbContext
import amanuensis.server.auth as auth import amanuensis.server.auth as auth
@ -58,6 +60,11 @@ def get_app(
app.template_filter("date")(date_format) app.template_filter("date")(date_format)
def include_backend():
return {"db": db, "lexiq": amanuensis.backend.lexicon, "userq": amanuensis.backend.user}
app.context_processor(include_backend)
# Set up Flask-Login # Set up Flask-Login
auth.get_login_manager().init_app(app) auth.get_login_manager().init_app(app)

View File

@ -39,7 +39,7 @@ def get_login_manager() -> LoginManager:
user_id = int(user_id_str) user_id = int(user_id_str)
except: except:
return None return None
return userq.get_user_by_id(g.db, user_id) return userq.from_id(g.db, user_id)
login_manager.user_loader(load_user) login_manager.user_loader(load_user)
@ -58,7 +58,7 @@ def login():
# POST with valid data # POST with valid data
username: str = form.username.data username: str = form.username.data
password: str = form.password.data password: str = form.password.data
user: User = userq.get_user_by_username(g.db, username) user: User = userq.from_username(g.db, username)
if not user or not userq.password_check(g.db, username, password): if not user or not userq.password_check(g.db, username, password):
# Bad creds # Bad creds
flash("Login not recognized") flash("Login not recognized")
@ -69,11 +69,11 @@ def login():
login_user(user, remember=remember_me) login_user(user, remember=remember_me)
userq.update_logged_in(g.db, username) userq.update_logged_in(g.db, username)
LOG.info("Logged in user {0.username} ({0.id})".format(user)) LOG.info("Logged in user {0.username} ({0.id})".format(user))
return redirect(url_for("home.admin")) return redirect(url_for("home.home"))
@bp.get("/logout/") @bp.get("/logout/")
@login_required @login_required
def logout(): def logout():
logout_user() logout_user()
return redirect(url_for("home.admin")) return redirect(url_for("home.home"))

View File

@ -1,43 +1,23 @@
from flask import Blueprint, render_template, g from flask import Blueprint, render_template, g
# from flask import Blueprint, render_template, redirect, url_for, current_app
# from flask_login import login_required, current_user
import amanuensis.backend.user as userq import amanuensis.backend.user as userq
import amanuensis.backend.lexicon as lexiq import amanuensis.backend.lexicon as lexiq
# from amanuensis.config import RootConfigDirectoryContext
# from amanuensis.lexicon import create_lexicon, load_all_lexicons
# from amanuensis.models import UserModel, ModelFactory
# from amanuensis.server.helpers import admin_required
# from .forms import LexiconCreateForm # from .forms import LexiconCreateForm
bp = Blueprint("home", __name__, url_prefix="/home", template_folder=".") bp = Blueprint("home", __name__, url_prefix="/home", template_folder=".")
# @bp.get("/") @bp.get("/")
# def home(): def home():
# Show lexicons that are visible to the current user return render_template('home.root.jinja')
# return "TODO"
# user_lexicons = []
# public_lexicons = []
# for lexicon in load_all_lexicons(root):
# if user.uid in lexicon.cfg.join.joined:
# user_lexicons.append(lexicon)
# elif lexicon.cfg.join.public:
# public_lexicons.append(lexicon)
# return render_template(
# 'home.root.jinja',
# user_lexicons=user_lexicons,
# public_lexicons=public_lexicons)
@bp.get("/admin/") @bp.get("/admin/")
# @login_required # @login_required
# @admin_required # @admin_required
def admin(): def admin():
return render_template("home.admin.jinja", db=g.db, userq=userq, lexiq=lexiq) return render_template("home.admin.jinja", userq=userq, lexiq=lexiq)
# @bp_home.route("/admin/create/", methods=['GET', 'POST']) # @bp_home.route("/admin/create/", methods=['GET', 'POST'])

View File

@ -4,17 +4,17 @@
{% block header %}<h2>Amanuensis - Admin Dashboard</h2>{% endblock %} {% block header %}<h2>Amanuensis - Admin Dashboard</h2>{% endblock %}
{# TODO #} {# TODO #}
{% block sb_home %}<a href="#{#{ url_for('home.home') }#}">Home</a>{% endblock %} {% block sb_home %}<a href="{{ url_for('home.home') }}">Home</a>{% endblock %}
{% block sb_create %}<a href="#{#{ url_for('home.admin_create') }#}">Create a lexicon</a>{% endblock %} {% block sb_create %}<a href="#{#{ url_for('home.admin_create') }#}">Create a lexicon</a>{% endblock %}
{% set template_sidebar_rows = [self.sb_home(), self.sb_create()] %} {% set template_sidebar_rows = [self.sb_home(), self.sb_create()] %}
{% block main %} {% block main %}
<p>Users:</p> <p>Users:</p>
{% for user in userq.get_all_users(db) %} {% for user in userq.get_all(db) %}
{{ macros.dashboard_user_item(user) }} {{ macros.dashboard_user_item(user) }}
{% endfor %} {% endfor %}
<p>Lexicons:</p> <p>Lexicons:</p>
{% for lexicon in lexiq.get_all_lexicons(db) %} {% for lexicon in lexiq.get_all(db) %}
{{ macros.dashboard_lexicon_item(lexicon) }} {{ macros.dashboard_lexicon_item(lexicon) }}
{% endfor %} {% endfor %}
{% endblock %} {% endblock %}

View File

@ -11,10 +11,16 @@
<span style="color:#ff0000">{{ message }}</span><br> <span style="color:#ff0000">{{ message }}</span><br>
{% endfor %} {% endfor %}
{% if current_user.is_authenticated %}
{% set joined = lexiq.get_joined(db, current_user.id)|list %}
{% else %}
{% set joined = [] %}
{% endif %}
{% if current_user.is_authenticated %} {% if current_user.is_authenticated %}
<h2>Your games</h2> <h2>Your games</h2>
{% if user_lexicons %} {% if joined %}
{% for lexicon in user_lexicons %} {% for lexicon in joined %}
{{ macros.dashboard_lexicon_item(lexicon) }} {{ macros.dashboard_lexicon_item(lexicon) }}
{% endfor %} {% endfor %}
{% else %} {% else %}
@ -22,9 +28,10 @@
{% endif %} {% endif %}
{% endif %} {% endif %}
{% set public = lexiq.get_public(db)|reject("in", joined)|list %}
<h2>Public games</h2> <h2>Public games</h2>
{% if public_lexicons %} {% if public %}
{% for lexicon in public_lexicons %} {% for lexicon in public %}
{{ macros.dashboard_lexicon_item(lexicon) }} {{ macros.dashboard_lexicon_item(lexicon) }}
{% endfor %} {% endfor %}
{% else %} {% else %}
@ -34,7 +41,7 @@
{% endblock %} {% endblock %}
{% set template_content_blocks = [self.main()] %} {% set template_content_blocks = [self.main()] %}
{% if current_user.cfg.is_admin %} {% if current_user.is_site_admin %}
{% block admin_dash %} {% block admin_dash %}
<a href="{{ url_for('home.admin') }}" style="display:block; text-align:center;">Admin dashboard</a> <a href="{{ url_for('home.admin') }}" style="display:block; text-align:center;">Admin dashboard</a>
{% endblock %} {% endblock %}

View File

@ -3,33 +3,35 @@
<div class="dashboard-lexicon-item dashboard-lexicon-{{ status }}"> <div class="dashboard-lexicon-item dashboard-lexicon-{{ status }}">
<p> <p>
<span class="dashboard-lexicon-item-title"> <span class="dashboard-lexicon-item-title">
<a href="#{#{ url_for('lexicon.contents', name=lexicon.cfg.name) }#}"> <a href="#{#{ url_for('lexicon.contents', name=lexicon.cfg.name) }#}">{{ lexicon.full_title }}</a>
{{ lexicon.full_title }}</a>
</span> </span>
[{{ lexicon.status.capitalize() }}] [{{ status.capitalize() }}]
</p> </p>
<p><i>{{ lexicon.prompt }}</i></p> <p><i>{{ lexicon.prompt }}</i></p>
{% if current_user.is_authenticated %} {% if current_user.is_authenticated %}
<p> <p>
{# TODO #} {#-
{# {% Show detailed player information if the current user is a member of the lexicon or if the current user is a site admin. The filter sequence must be converted to a list because it returns a generator, which is truthy.
if current_user.uid in lexicon.cfg.join.joined -#}
{%-
if lexicon.memberships|map(attribute="user_id")|select("equalto", current_user.id)|list
or current_user.is_site_admin or current_user.is_site_admin
%} #} -%}
Editor: {#{ lexicon.cfg.editor|user_attr('username') }#} / Editor: {{
Players: lexicon.memberships|selectattr("is_editor")|map(attribute="user")|map(attribute="username")|join(", ")
{# {% for uid in lexicon.cfg.join.joined %} #} }} / Players: {{
{# {{ uid|user_attr('username') }}{% if not loop.last %}, {% endif %} #} lexicon.memberships|map(attribute="user")|map(attribute="username")|join(", ")
{# {% endfor %} #} }} ({{ lexicon.memberships|count }}
{# ({{ lexicon.cfg.join.joined|count }}/{{ lexicon.cfg.join.max_players }}) #} {%- if lexicon.player_limit is not none -%}
{# {% else %} #} /{{ lexicon.player_limit }}
{# Players: {{ lexicon.cfg.join.joined|count }}/{{ lexicon.cfg.join.max_players }} #} {%- endif -%})
{# {% if lexicon.cfg.join.public and lexicon.cfg.join.open %} #} {%- else -%}
{# / <a href="{{ url_for('lexicon.join', name=lexicon.cfg.name) }}"> #} Players: {{ lexicon.memberships|count }}{% if lexicon.player_limit is not none %} / {{ lexicon.player_limit }}{% endif -%}
{# Join game #} {%-
{# </a> #} if lexicon.public and lexicon.joinable
{# {% endif %} #} %} / <a href="#{#{ url_for('lexicon.join', name=lexicon.cfg.name) }#}">Join game</a>
{# {% endif %} #} {%- endif -%}
{%- endif -%}
</p> </p>
{% endif %} {% endif %}
</div> </div>

View File

@ -1,22 +1,35 @@
""" """
pytest test fixtures pytest test fixtures
""" """
import os
import pytest import pytest
import tempfile
from sqlalchemy.orm.session import close_all_sessions
from amanuensis.db import DbContext
import amanuensis.backend.character as charq import amanuensis.backend.character as charq
import amanuensis.backend.lexicon as lexiq import amanuensis.backend.lexicon as lexiq
import amanuensis.backend.membership as memq import amanuensis.backend.membership as memq
import amanuensis.backend.user as userq import amanuensis.backend.user as userq
from amanuensis.config import AmanuensisConfig from amanuensis.config import AmanuensisConfig
from amanuensis.db import DbContext
from amanuensis.server import get_app from amanuensis.server import get_app
@pytest.fixture @pytest.fixture
def db() -> DbContext: def db(request) -> DbContext:
"""Provides an initialized database in memory.""" """Provides a fully-initialized ephemeral database."""
db = DbContext(uri="sqlite:///:memory:", echo=False) db_fd, db_path = tempfile.mkstemp()
db = DbContext(path=db_path, echo=False)
db.create_all() db.create_all()
def db_teardown():
close_all_sessions()
os.close(db_fd)
os.unlink(db_path)
request.addfinalizer(db_teardown)
return db return db
@ -128,12 +141,10 @@ def lexicon_with_editor(make):
class TestConfig(AmanuensisConfig): class TestConfig(AmanuensisConfig):
TESTING = True TESTING = True
SECRET_KEY = "secret key" SECRET_KEY = os.urandom(32).hex()
DATABASE_URI = "sqlite:///:memory:"
@pytest.fixture @pytest.fixture
def app(db): def app(db: DbContext):
"""Provides an application running on top of the test database.""" """Provides an application running on top of the test database."""
server_app = get_app(TestConfig, db) return get_app(TestConfig(), db)
return server_app