Compare commits

..

No commits in common. "ba346c29bc8d6deb9ce63ae29a1f9fefd823cedf" and "0e35f15a3a28feb11fffa5f080f213f246b97af4" have entirely different histories.

20 changed files with 91 additions and 151 deletions

View File

@ -3,11 +3,11 @@ Lexicon query interface
"""
import re
from typing import Sequence, Optional
from typing import Sequence
from sqlalchemy import select, func
from amanuensis.db import DbContext, Lexicon, Membership
from amanuensis.db import DbContext, Lexicon
from amanuensis.errors import ArgumentError
@ -17,7 +17,7 @@ RE_ALPHANUM_DASH_UNDER = re.compile(r"^[A-Za-z0-9-_]*$")
def create(
db: DbContext,
name: str,
title: Optional[str],
title: str,
prompt: str,
) -> Lexicon:
"""
@ -55,21 +55,6 @@ def create(
return new_lexicon
def from_name(db: DbContext, name: str) -> Lexicon:
"""Get a lexicon by its name."""
return db(select(Lexicon).where(Lexicon.name == name)).scalar_one()
def get_all(db: DbContext) -> Sequence[Lexicon]:
def get_all_lexicons(db: DbContext) -> Sequence[Lexicon]:
"""Get all lexicons."""
return db(select(Lexicon)).scalars()
def get_joined(db: DbContext, user_id: int) -> Sequence[Lexicon]:
"""Get all lexicons that a player is in."""
return db(select(Lexicon).join(Lexicon.memberships).where(Membership.user_id == user_id)).scalars()
def get_public(db: DbContext) -> Sequence[Lexicon]:
"""Get all publicly visible lexicons."""
return db(select(Lexicon).where(Lexicon.public == True)).scalars()

View File

@ -21,7 +21,7 @@ def create(
db: DbContext,
username: str,
password: str,
display_name: Optional[str],
display_name: str,
email: str,
is_site_admin: bool,
) -> User:
@ -71,7 +71,12 @@ def create(
return new_user
def from_id(db: DbContext, user_id: int) -> Optional[User]:
def get_all_users(db: DbContext) -> Sequence[User]:
"""Get all users."""
return db(select(User)).scalars()
def get_user_by_id(db: DbContext, user_id: int) -> Optional[User]:
"""
Get a user by the user's id.
Returns None if no user was found.
@ -80,7 +85,7 @@ def from_id(db: DbContext, user_id: int) -> Optional[User]:
return user
def from_username(db: DbContext, username: str) -> Optional[User]:
def get_user_by_username(db: DbContext, username: str) -> Optional[User]:
"""
Get a user by the user's username.
Returns None if no user was found.
@ -89,11 +94,6 @@ def from_username(db: DbContext, username: str) -> Optional[User]:
return user
def get_all(db: DbContext) -> Sequence[User]:
"""Get all users."""
return db(select(User)).scalars()
def password_set(db: DbContext, username: str, new_password: str) -> None:
"""Set a user's password."""
password_hash = generate_password_hash(new_password)

View File

@ -1,13 +1,5 @@
from argparse import BooleanOptionalAction
import logging
from sqlalchemy import update
import amanuensis.backend.lexicon as lexiq
import amanuensis.backend.membership as memq
import amanuensis.backend.user as userq
from amanuensis.db import DbContext, Lexicon
from .helpers import add_argument
@ -17,51 +9,22 @@ COMMAND_HELP = "Interact with lexicons."
LOG = logging.getLogger(__name__)
@add_argument("lexicon")
@add_argument("user")
@add_argument("--editor", action="store_true")
def command_add(args) -> int:
db: DbContext = args.get_db()
lexicon = lexiq.from_name(db, args.lexicon)
user = userq.from_username(db, args.user)
assert user is not None
memq.create(db, user.id, lexicon.id, args.editor)
LOG.info(f"Added {args.user} to lexicon {args.lexicon}")
return 0
@add_argument("name")
def command_create(args):
"""
Create a lexicon.
"""
db: DbContext = args.get_db()
lexiq.create(db, args.name, None, f"Prompt for Lexicon {args.name}")
LOG.info(f"Created lexicon {args.name}")
return 0
raise NotImplementedError()
@add_argument("name")
@add_argument("--public", action=BooleanOptionalAction)
@add_argument("--join", action=BooleanOptionalAction)
def command_edit(args):
def command_delete(args):
"""
Update a lexicon's configuration.
Delete a lexicon.
"""
db: DbContext = args.get_db()
values = {}
raise NotImplementedError()
if args.public == True:
values["public"] = True
elif args.public == False:
values["public"] = False
if args.join == True:
values["joinable"] = True
elif args.join == False:
values["joinable"] = False
result = db(update(Lexicon).where(Lexicon.name == args.name).values(**values))
LOG.info(f"Updated {result.rowcount} lexicons")
db.session.commit()
return 0 if result.rowcount == 1 else -1
def command_list(args):
"""
List all lexicons and their statuses.
"""
raise NotImplementedError()

View File

@ -21,7 +21,6 @@ def command_create(args) -> int:
db: DbContext = args.get_db()
userq.create(db, args.username, "password", args.username, args.email, False)
userq.password_set(db, args.username, args.password)
LOG.info(f"Created user {args.username}")
return 0
@ -29,7 +28,7 @@ def command_create(args) -> int:
def command_promote(args) -> int:
"""Make a user a site admin."""
db: DbContext = args.get_db()
user: Optional[User] = userq.from_username(db, args.username)
user: Optional[User] = userq.get_user_by_username(db, args.username)
if user is None:
args.parser.error("User not found")
return -1
@ -46,7 +45,7 @@ def command_promote(args) -> int:
def command_demote(args):
"""Revoke a user's site admin status."""
db: DbContext = args.get_db()
user: Optional[User] = userq.from_username(db, args.username)
user: Optional[User] = userq.get_user_by_username(db, args.username)
if user is None:
args.parser.error("User not found")
return -1

View File

@ -39,10 +39,10 @@ class DbContext:
if path and uri:
raise ValueError("Only one of path and uri may be specified")
self.db_uri = uri if uri else f"sqlite:///{os.path.abspath(path)}"
db_uri = uri if uri else f"sqlite:///{os.path.abspath(path)}"
# Create an engine and enable foreign key constraints in sqlite
self.engine = create_engine(self.db_uri, echo=echo)
self.engine = create_engine(db_uri, echo=echo)
@event.listens_for(self.engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):

View File

@ -4,8 +4,6 @@ import os
from flask import Flask, g
import amanuensis.backend.lexicon
import amanuensis.backend.user
from amanuensis.config import AmanuensisConfig, CommandLineConfig
from amanuensis.db import DbContext
import amanuensis.server.auth as auth
@ -60,11 +58,6 @@ def get_app(
app.template_filter("date")(date_format)
def include_backend():
return {"db": db, "lexiq": amanuensis.backend.lexicon, "userq": amanuensis.backend.user}
app.context_processor(include_backend)
# Set up Flask-Login
auth.get_login_manager().init_app(app)

View File

@ -39,7 +39,7 @@ def get_login_manager() -> LoginManager:
user_id = int(user_id_str)
except:
return None
return userq.from_id(g.db, user_id)
return userq.get_user_by_id(g.db, user_id)
login_manager.user_loader(load_user)
@ -58,7 +58,7 @@ def login():
# POST with valid data
username: str = form.username.data
password: str = form.password.data
user: User = userq.from_username(g.db, username)
user: User = userq.get_user_by_username(g.db, username)
if not user or not userq.password_check(g.db, username, password):
# Bad creds
flash("Login not recognized")
@ -69,11 +69,11 @@ def login():
login_user(user, remember=remember_me)
userq.update_logged_in(g.db, username)
LOG.info("Logged in user {0.username} ({0.id})".format(user))
return redirect(url_for("home.home"))
return redirect(url_for("home.admin"))
@bp.get("/logout/")
@login_required
def logout():
logout_user()
return redirect(url_for("home.home"))
return redirect(url_for("home.admin"))

View File

@ -1,23 +1,43 @@
from flask import Blueprint, render_template, g
# from flask import Blueprint, render_template, redirect, url_for, current_app
# from flask_login import login_required, current_user
import amanuensis.backend.user as userq
import amanuensis.backend.lexicon as lexiq
# from amanuensis.config import RootConfigDirectoryContext
# from amanuensis.lexicon import create_lexicon, load_all_lexicons
# from amanuensis.models import UserModel, ModelFactory
# from amanuensis.server.helpers import admin_required
# from .forms import LexiconCreateForm
bp = Blueprint("home", __name__, url_prefix="/home", template_folder=".")
@bp.get("/")
def home():
return render_template('home.root.jinja')
# @bp.get("/")
# def home():
# Show lexicons that are visible to the current user
# return "TODO"
# user_lexicons = []
# public_lexicons = []
# for lexicon in load_all_lexicons(root):
# if user.uid in lexicon.cfg.join.joined:
# user_lexicons.append(lexicon)
# elif lexicon.cfg.join.public:
# public_lexicons.append(lexicon)
# return render_template(
# 'home.root.jinja',
# user_lexicons=user_lexicons,
# public_lexicons=public_lexicons)
@bp.get("/admin/")
# @login_required
# @admin_required
def admin():
return render_template("home.admin.jinja", userq=userq, lexiq=lexiq)
return render_template("home.admin.jinja", db=g.db, userq=userq, lexiq=lexiq)
# @bp_home.route("/admin/create/", methods=['GET', 'POST'])

View File

@ -4,17 +4,17 @@
{% block header %}<h2>Amanuensis - Admin Dashboard</h2>{% endblock %}
{# TODO #}
{% block sb_home %}<a href="{{ url_for('home.home') }}">Home</a>{% endblock %}
{% block sb_home %}<a href="#{#{ url_for('home.home') }#}">Home</a>{% endblock %}
{% block sb_create %}<a href="#{#{ url_for('home.admin_create') }#}">Create a lexicon</a>{% endblock %}
{% set template_sidebar_rows = [self.sb_home(), self.sb_create()] %}
{% block main %}
<p>Users:</p>
{% for user in userq.get_all(db) %}
{% for user in userq.get_all_users(db) %}
{{ macros.dashboard_user_item(user) }}
{% endfor %}
<p>Lexicons:</p>
{% for lexicon in lexiq.get_all(db) %}
{% for lexicon in lexiq.get_all_lexicons(db) %}
{{ macros.dashboard_lexicon_item(lexicon) }}
{% endfor %}
{% endblock %}

View File

@ -11,16 +11,10 @@
<span style="color:#ff0000">{{ message }}</span><br>
{% endfor %}
{% if current_user.is_authenticated %}
{% set joined = lexiq.get_joined(db, current_user.id)|list %}
{% else %}
{% set joined = [] %}
{% endif %}
{% if current_user.is_authenticated %}
<h2>Your games</h2>
{% if joined %}
{% for lexicon in joined %}
{% if user_lexicons %}
{% for lexicon in user_lexicons %}
{{ macros.dashboard_lexicon_item(lexicon) }}
{% endfor %}
{% else %}
@ -28,10 +22,9 @@
{% endif %}
{% endif %}
{% set public = lexiq.get_public(db)|reject("in", joined)|list %}
<h2>Public games</h2>
{% if public %}
{% for lexicon in public %}
{% if public_lexicons %}
{% for lexicon in public_lexicons %}
{{ macros.dashboard_lexicon_item(lexicon) }}
{% endfor %}
{% else %}
@ -41,7 +34,7 @@
{% endblock %}
{% set template_content_blocks = [self.main()] %}
{% if current_user.is_site_admin %}
{% if current_user.cfg.is_admin %}
{% block admin_dash %}
<a href="{{ url_for('home.admin') }}" style="display:block; text-align:center;">Admin dashboard</a>
{% endblock %}

View File

@ -3,35 +3,33 @@
<div class="dashboard-lexicon-item dashboard-lexicon-{{ status }}">
<p>
<span class="dashboard-lexicon-item-title">
<a href="#{#{ url_for('lexicon.contents', name=lexicon.cfg.name) }#}">{{ lexicon.full_title }}</a>
<a href="#{#{ url_for('lexicon.contents', name=lexicon.cfg.name) }#}">
{{ lexicon.full_title }}</a>
</span>
[{{ status.capitalize() }}]
[{{ lexicon.status.capitalize() }}]
</p>
<p><i>{{ lexicon.prompt }}</i></p>
{% if current_user.is_authenticated %}
<p>
{#-
Show detailed player information if the current user is a member of the lexicon or if the current user is a site admin. The filter sequence must be converted to a list because it returns a generator, which is truthy.
-#}
{%-
if lexicon.memberships|map(attribute="user_id")|select("equalto", current_user.id)|list
{# TODO #}
{# {%
if current_user.uid in lexicon.cfg.join.joined
or current_user.is_site_admin
-%}
Editor: {{
lexicon.memberships|selectattr("is_editor")|map(attribute="user")|map(attribute="username")|join(", ")
}} / Players: {{
lexicon.memberships|map(attribute="user")|map(attribute="username")|join(", ")
}} ({{ lexicon.memberships|count }}
{%- if lexicon.player_limit is not none -%}
/{{ lexicon.player_limit }}
{%- endif -%})
{%- else -%}
Players: {{ lexicon.memberships|count }}{% if lexicon.player_limit is not none %} / {{ lexicon.player_limit }}{% endif -%}
{%-
if lexicon.public and lexicon.joinable
%} / <a href="#{#{ url_for('lexicon.join', name=lexicon.cfg.name) }#}">Join game</a>
{%- endif -%}
{%- endif -%}
%} #}
Editor: {#{ lexicon.cfg.editor|user_attr('username') }#} /
Players:
{# {% for uid in lexicon.cfg.join.joined %} #}
{# {{ uid|user_attr('username') }}{% if not loop.last %}, {% endif %} #}
{# {% endfor %} #}
{# ({{ lexicon.cfg.join.joined|count }}/{{ lexicon.cfg.join.max_players }}) #}
{# {% else %} #}
{# Players: {{ lexicon.cfg.join.joined|count }}/{{ lexicon.cfg.join.max_players }} #}
{# {% if lexicon.cfg.join.public and lexicon.cfg.join.open %} #}
{# / <a href="{{ url_for('lexicon.join', name=lexicon.cfg.name) }}"> #}
{# Join game #}
{# </a> #}
{# {% endif %} #}
{# {% endif %} #}
</p>
{% endif %}
</div>

View File

@ -1,35 +1,22 @@
"""
pytest test fixtures
"""
import os
import pytest
import tempfile
from sqlalchemy.orm.session import close_all_sessions
from amanuensis.db import DbContext
import amanuensis.backend.character as charq
import amanuensis.backend.lexicon as lexiq
import amanuensis.backend.membership as memq
import amanuensis.backend.user as userq
from amanuensis.config import AmanuensisConfig
from amanuensis.db import DbContext
from amanuensis.server import get_app
@pytest.fixture
def db(request) -> DbContext:
"""Provides a fully-initialized ephemeral database."""
db_fd, db_path = tempfile.mkstemp()
db = DbContext(path=db_path, echo=False)
def db() -> DbContext:
"""Provides an initialized database in memory."""
db = DbContext(uri="sqlite:///:memory:", echo=False)
db.create_all()
def db_teardown():
close_all_sessions()
os.close(db_fd)
os.unlink(db_path)
request.addfinalizer(db_teardown)
return db
@ -141,10 +128,12 @@ def lexicon_with_editor(make):
class TestConfig(AmanuensisConfig):
TESTING = True
SECRET_KEY = os.urandom(32).hex()
SECRET_KEY = "secret key"
DATABASE_URI = "sqlite:///:memory:"
@pytest.fixture
def app(db: DbContext):
def app(db):
"""Provides an application running on top of the test database."""
return get_app(TestConfig(), db)
server_app = get_app(TestConfig, db)
return server_app