Compare commits

..

5 Commits

20 changed files with 151 additions and 91 deletions

View File

@ -3,11 +3,11 @@ Lexicon query interface
"""
import re
from typing import Sequence
from typing import Sequence, Optional
from sqlalchemy import select, func
from amanuensis.db import DbContext, Lexicon
from amanuensis.db import DbContext, Lexicon, Membership
from amanuensis.errors import ArgumentError
@ -17,7 +17,7 @@ RE_ALPHANUM_DASH_UNDER = re.compile(r"^[A-Za-z0-9-_]*$")
def create(
db: DbContext,
name: str,
title: str,
title: Optional[str],
prompt: str,
) -> Lexicon:
"""
@ -55,6 +55,21 @@ def create(
return new_lexicon
def get_all_lexicons(db: DbContext) -> Sequence[Lexicon]:
def from_name(db: DbContext, name: str) -> Lexicon:
"""Get a lexicon by its name."""
return db(select(Lexicon).where(Lexicon.name == name)).scalar_one()
def get_all(db: DbContext) -> Sequence[Lexicon]:
"""Get all lexicons."""
return db(select(Lexicon)).scalars()
def get_joined(db: DbContext, user_id: int) -> Sequence[Lexicon]:
"""Get all lexicons that a player is in."""
return db(select(Lexicon).join(Lexicon.memberships).where(Membership.user_id == user_id)).scalars()
def get_public(db: DbContext) -> Sequence[Lexicon]:
"""Get all publicly visible lexicons."""
return db(select(Lexicon).where(Lexicon.public == True)).scalars()

View File

@ -21,7 +21,7 @@ def create(
db: DbContext,
username: str,
password: str,
display_name: str,
display_name: Optional[str],
email: str,
is_site_admin: bool,
) -> User:
@ -71,12 +71,7 @@ def create(
return new_user
def get_all_users(db: DbContext) -> Sequence[User]:
"""Get all users."""
return db(select(User)).scalars()
def get_user_by_id(db: DbContext, user_id: int) -> Optional[User]:
def from_id(db: DbContext, user_id: int) -> Optional[User]:
"""
Get a user by the user's id.
Returns None if no user was found.
@ -85,7 +80,7 @@ def get_user_by_id(db: DbContext, user_id: int) -> Optional[User]:
return user
def get_user_by_username(db: DbContext, username: str) -> Optional[User]:
def from_username(db: DbContext, username: str) -> Optional[User]:
"""
Get a user by the user's username.
Returns None if no user was found.
@ -94,6 +89,11 @@ def get_user_by_username(db: DbContext, username: str) -> Optional[User]:
return user
def get_all(db: DbContext) -> Sequence[User]:
"""Get all users."""
return db(select(User)).scalars()
def password_set(db: DbContext, username: str, new_password: str) -> None:
"""Set a user's password."""
password_hash = generate_password_hash(new_password)

View File

@ -1,5 +1,13 @@
from argparse import BooleanOptionalAction
import logging
from sqlalchemy import update
import amanuensis.backend.lexicon as lexiq
import amanuensis.backend.membership as memq
import amanuensis.backend.user as userq
from amanuensis.db import DbContext, Lexicon
from .helpers import add_argument
@ -9,22 +17,51 @@ COMMAND_HELP = "Interact with lexicons."
LOG = logging.getLogger(__name__)
@add_argument("lexicon")
@add_argument("user")
@add_argument("--editor", action="store_true")
def command_add(args) -> int:
db: DbContext = args.get_db()
lexicon = lexiq.from_name(db, args.lexicon)
user = userq.from_username(db, args.user)
assert user is not None
memq.create(db, user.id, lexicon.id, args.editor)
LOG.info(f"Added {args.user} to lexicon {args.lexicon}")
return 0
@add_argument("name")
def command_create(args):
"""
Create a lexicon.
"""
raise NotImplementedError()
db: DbContext = args.get_db()
lexiq.create(db, args.name, None, f"Prompt for Lexicon {args.name}")
LOG.info(f"Created lexicon {args.name}")
return 0
def command_delete(args):
@add_argument("name")
@add_argument("--public", action=BooleanOptionalAction)
@add_argument("--join", action=BooleanOptionalAction)
def command_edit(args):
"""
Delete a lexicon.
Update a lexicon's configuration.
"""
raise NotImplementedError()
db: DbContext = args.get_db()
values = {}
if args.public == True:
values["public"] = True
elif args.public == False:
values["public"] = False
def command_list(args):
"""
List all lexicons and their statuses.
"""
raise NotImplementedError()
if args.join == True:
values["joinable"] = True
elif args.join == False:
values["joinable"] = False
result = db(update(Lexicon).where(Lexicon.name == args.name).values(**values))
LOG.info(f"Updated {result.rowcount} lexicons")
db.session.commit()
return 0 if result.rowcount == 1 else -1

View File

@ -21,6 +21,7 @@ def command_create(args) -> int:
db: DbContext = args.get_db()
userq.create(db, args.username, "password", args.username, args.email, False)
userq.password_set(db, args.username, args.password)
LOG.info(f"Created user {args.username}")
return 0
@ -28,7 +29,7 @@ def command_create(args) -> int:
def command_promote(args) -> int:
"""Make a user a site admin."""
db: DbContext = args.get_db()
user: Optional[User] = userq.get_user_by_username(db, args.username)
user: Optional[User] = userq.from_username(db, args.username)
if user is None:
args.parser.error("User not found")
return -1
@ -45,7 +46,7 @@ def command_promote(args) -> int:
def command_demote(args):
"""Revoke a user's site admin status."""
db: DbContext = args.get_db()
user: Optional[User] = userq.get_user_by_username(db, args.username)
user: Optional[User] = userq.from_username(db, args.username)
if user is None:
args.parser.error("User not found")
return -1

View File

@ -39,10 +39,10 @@ class DbContext:
if path and uri:
raise ValueError("Only one of path and uri may be specified")
db_uri = uri if uri else f"sqlite:///{os.path.abspath(path)}"
self.db_uri = uri if uri else f"sqlite:///{os.path.abspath(path)}"
# Create an engine and enable foreign key constraints in sqlite
self.engine = create_engine(db_uri, echo=echo)
self.engine = create_engine(self.db_uri, echo=echo)
@event.listens_for(self.engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):

View File

@ -4,6 +4,8 @@ import os
from flask import Flask, g
import amanuensis.backend.lexicon
import amanuensis.backend.user
from amanuensis.config import AmanuensisConfig, CommandLineConfig
from amanuensis.db import DbContext
import amanuensis.server.auth as auth
@ -58,6 +60,11 @@ def get_app(
app.template_filter("date")(date_format)
def include_backend():
return {"db": db, "lexiq": amanuensis.backend.lexicon, "userq": amanuensis.backend.user}
app.context_processor(include_backend)
# Set up Flask-Login
auth.get_login_manager().init_app(app)

View File

@ -39,7 +39,7 @@ def get_login_manager() -> LoginManager:
user_id = int(user_id_str)
except:
return None
return userq.get_user_by_id(g.db, user_id)
return userq.from_id(g.db, user_id)
login_manager.user_loader(load_user)
@ -58,7 +58,7 @@ def login():
# POST with valid data
username: str = form.username.data
password: str = form.password.data
user: User = userq.get_user_by_username(g.db, username)
user: User = userq.from_username(g.db, username)
if not user or not userq.password_check(g.db, username, password):
# Bad creds
flash("Login not recognized")
@ -69,11 +69,11 @@ def login():
login_user(user, remember=remember_me)
userq.update_logged_in(g.db, username)
LOG.info("Logged in user {0.username} ({0.id})".format(user))
return redirect(url_for("home.admin"))
return redirect(url_for("home.home"))
@bp.get("/logout/")
@login_required
def logout():
logout_user()
return redirect(url_for("home.admin"))
return redirect(url_for("home.home"))

View File

@ -1,43 +1,23 @@
from flask import Blueprint, render_template, g
# from flask import Blueprint, render_template, redirect, url_for, current_app
# from flask_login import login_required, current_user
import amanuensis.backend.user as userq
import amanuensis.backend.lexicon as lexiq
# from amanuensis.config import RootConfigDirectoryContext
# from amanuensis.lexicon import create_lexicon, load_all_lexicons
# from amanuensis.models import UserModel, ModelFactory
# from amanuensis.server.helpers import admin_required
# from .forms import LexiconCreateForm
bp = Blueprint("home", __name__, url_prefix="/home", template_folder=".")
# @bp.get("/")
# def home():
# Show lexicons that are visible to the current user
# return "TODO"
# user_lexicons = []
# public_lexicons = []
# for lexicon in load_all_lexicons(root):
# if user.uid in lexicon.cfg.join.joined:
# user_lexicons.append(lexicon)
# elif lexicon.cfg.join.public:
# public_lexicons.append(lexicon)
# return render_template(
# 'home.root.jinja',
# user_lexicons=user_lexicons,
# public_lexicons=public_lexicons)
@bp.get("/")
def home():
return render_template('home.root.jinja')
@bp.get("/admin/")
# @login_required
# @admin_required
def admin():
return render_template("home.admin.jinja", db=g.db, userq=userq, lexiq=lexiq)
return render_template("home.admin.jinja", userq=userq, lexiq=lexiq)
# @bp_home.route("/admin/create/", methods=['GET', 'POST'])

View File

@ -4,17 +4,17 @@
{% block header %}<h2>Amanuensis - Admin Dashboard</h2>{% endblock %}
{# TODO #}
{% block sb_home %}<a href="#{#{ url_for('home.home') }#}">Home</a>{% endblock %}
{% block sb_home %}<a href="{{ url_for('home.home') }}">Home</a>{% endblock %}
{% block sb_create %}<a href="#{#{ url_for('home.admin_create') }#}">Create a lexicon</a>{% endblock %}
{% set template_sidebar_rows = [self.sb_home(), self.sb_create()] %}
{% block main %}
<p>Users:</p>
{% for user in userq.get_all_users(db) %}
{% for user in userq.get_all(db) %}
{{ macros.dashboard_user_item(user) }}
{% endfor %}
<p>Lexicons:</p>
{% for lexicon in lexiq.get_all_lexicons(db) %}
{% for lexicon in lexiq.get_all(db) %}
{{ macros.dashboard_lexicon_item(lexicon) }}
{% endfor %}
{% endblock %}

View File

@ -11,10 +11,16 @@
<span style="color:#ff0000">{{ message }}</span><br>
{% endfor %}
{% if current_user.is_authenticated %}
{% set joined = lexiq.get_joined(db, current_user.id)|list %}
{% else %}
{% set joined = [] %}
{% endif %}
{% if current_user.is_authenticated %}
<h2>Your games</h2>
{% if user_lexicons %}
{% for lexicon in user_lexicons %}
{% if joined %}
{% for lexicon in joined %}
{{ macros.dashboard_lexicon_item(lexicon) }}
{% endfor %}
{% else %}
@ -22,9 +28,10 @@
{% endif %}
{% endif %}
{% set public = lexiq.get_public(db)|reject("in", joined)|list %}
<h2>Public games</h2>
{% if public_lexicons %}
{% for lexicon in public_lexicons %}
{% if public %}
{% for lexicon in public %}
{{ macros.dashboard_lexicon_item(lexicon) }}
{% endfor %}
{% else %}
@ -34,7 +41,7 @@
{% endblock %}
{% set template_content_blocks = [self.main()] %}
{% if current_user.cfg.is_admin %}
{% if current_user.is_site_admin %}
{% block admin_dash %}
<a href="{{ url_for('home.admin') }}" style="display:block; text-align:center;">Admin dashboard</a>
{% endblock %}

View File

@ -3,33 +3,35 @@
<div class="dashboard-lexicon-item dashboard-lexicon-{{ status }}">
<p>
<span class="dashboard-lexicon-item-title">
<a href="#{#{ url_for('lexicon.contents', name=lexicon.cfg.name) }#}">
{{ lexicon.full_title }}</a>
<a href="#{#{ url_for('lexicon.contents', name=lexicon.cfg.name) }#}">{{ lexicon.full_title }}</a>
</span>
[{{ lexicon.status.capitalize() }}]
[{{ status.capitalize() }}]
</p>
<p><i>{{ lexicon.prompt }}</i></p>
{% if current_user.is_authenticated %}
<p>
{# TODO #}
{# {%
if current_user.uid in lexicon.cfg.join.joined
{#-
Show detailed player information if the current user is a member of the lexicon or if the current user is a site admin. The filter sequence must be converted to a list because it returns a generator, which is truthy.
-#}
{%-
if lexicon.memberships|map(attribute="user_id")|select("equalto", current_user.id)|list
or current_user.is_site_admin
%} #}
Editor: {#{ lexicon.cfg.editor|user_attr('username') }#} /
Players:
{# {% for uid in lexicon.cfg.join.joined %} #}
{# {{ uid|user_attr('username') }}{% if not loop.last %}, {% endif %} #}
{# {% endfor %} #}
{# ({{ lexicon.cfg.join.joined|count }}/{{ lexicon.cfg.join.max_players }}) #}
{# {% else %} #}
{# Players: {{ lexicon.cfg.join.joined|count }}/{{ lexicon.cfg.join.max_players }} #}
{# {% if lexicon.cfg.join.public and lexicon.cfg.join.open %} #}
{# / <a href="{{ url_for('lexicon.join', name=lexicon.cfg.name) }}"> #}
{# Join game #}
{# </a> #}
{# {% endif %} #}
{# {% endif %} #}
-%}
Editor: {{
lexicon.memberships|selectattr("is_editor")|map(attribute="user")|map(attribute="username")|join(", ")
}} / Players: {{
lexicon.memberships|map(attribute="user")|map(attribute="username")|join(", ")
}} ({{ lexicon.memberships|count }}
{%- if lexicon.player_limit is not none -%}
/{{ lexicon.player_limit }}
{%- endif -%})
{%- else -%}
Players: {{ lexicon.memberships|count }}{% if lexicon.player_limit is not none %} / {{ lexicon.player_limit }}{% endif -%}
{%-
if lexicon.public and lexicon.joinable
%} / <a href="#{#{ url_for('lexicon.join', name=lexicon.cfg.name) }#}">Join game</a>
{%- endif -%}
{%- endif -%}
</p>
{% endif %}
</div>

View File

@ -1,22 +1,35 @@
"""
pytest test fixtures
"""
import os
import pytest
import tempfile
from sqlalchemy.orm.session import close_all_sessions
from amanuensis.db import DbContext
import amanuensis.backend.character as charq
import amanuensis.backend.lexicon as lexiq
import amanuensis.backend.membership as memq
import amanuensis.backend.user as userq
from amanuensis.config import AmanuensisConfig
from amanuensis.db import DbContext
from amanuensis.server import get_app
@pytest.fixture
def db() -> DbContext:
"""Provides an initialized database in memory."""
db = DbContext(uri="sqlite:///:memory:", echo=False)
def db(request) -> DbContext:
"""Provides a fully-initialized ephemeral database."""
db_fd, db_path = tempfile.mkstemp()
db = DbContext(path=db_path, echo=False)
db.create_all()
def db_teardown():
close_all_sessions()
os.close(db_fd)
os.unlink(db_path)
request.addfinalizer(db_teardown)
return db
@ -128,12 +141,10 @@ def lexicon_with_editor(make):
class TestConfig(AmanuensisConfig):
TESTING = True
SECRET_KEY = "secret key"
DATABASE_URI = "sqlite:///:memory:"
SECRET_KEY = os.urandom(32).hex()
@pytest.fixture
def app(db):
def app(db: DbContext):
"""Provides an application running on top of the test database."""
server_app = get_app(TestConfig, db)
return server_app
return get_app(TestConfig(), db)