Compare commits
5 Commits
0e35f15a3a
...
ba346c29bc
Author | SHA1 | Date |
---|---|---|
Tim Van Baak | ba346c29bc | |
Tim Van Baak | b789bad6c0 | |
Tim Van Baak | 3cfc01a9c8 | |
Tim Van Baak | 562d7d8a4b | |
Tim Van Baak | 398b5705f1 |
|
@ -3,11 +3,11 @@ Lexicon query interface
|
|||
"""
|
||||
|
||||
import re
|
||||
from typing import Sequence
|
||||
from typing import Sequence, Optional
|
||||
|
||||
from sqlalchemy import select, func
|
||||
|
||||
from amanuensis.db import DbContext, Lexicon
|
||||
from amanuensis.db import DbContext, Lexicon, Membership
|
||||
from amanuensis.errors import ArgumentError
|
||||
|
||||
|
||||
|
@ -17,7 +17,7 @@ RE_ALPHANUM_DASH_UNDER = re.compile(r"^[A-Za-z0-9-_]*$")
|
|||
def create(
|
||||
db: DbContext,
|
||||
name: str,
|
||||
title: str,
|
||||
title: Optional[str],
|
||||
prompt: str,
|
||||
) -> Lexicon:
|
||||
"""
|
||||
|
@ -55,6 +55,21 @@ def create(
|
|||
return new_lexicon
|
||||
|
||||
|
||||
def get_all_lexicons(db: DbContext) -> Sequence[Lexicon]:
|
||||
def from_name(db: DbContext, name: str) -> Lexicon:
|
||||
"""Get a lexicon by its name."""
|
||||
return db(select(Lexicon).where(Lexicon.name == name)).scalar_one()
|
||||
|
||||
|
||||
def get_all(db: DbContext) -> Sequence[Lexicon]:
|
||||
"""Get all lexicons."""
|
||||
return db(select(Lexicon)).scalars()
|
||||
|
||||
|
||||
def get_joined(db: DbContext, user_id: int) -> Sequence[Lexicon]:
|
||||
"""Get all lexicons that a player is in."""
|
||||
return db(select(Lexicon).join(Lexicon.memberships).where(Membership.user_id == user_id)).scalars()
|
||||
|
||||
|
||||
def get_public(db: DbContext) -> Sequence[Lexicon]:
|
||||
"""Get all publicly visible lexicons."""
|
||||
return db(select(Lexicon).where(Lexicon.public == True)).scalars()
|
||||
|
|
|
@ -21,7 +21,7 @@ def create(
|
|||
db: DbContext,
|
||||
username: str,
|
||||
password: str,
|
||||
display_name: str,
|
||||
display_name: Optional[str],
|
||||
email: str,
|
||||
is_site_admin: bool,
|
||||
) -> User:
|
||||
|
@ -71,12 +71,7 @@ def create(
|
|||
return new_user
|
||||
|
||||
|
||||
def get_all_users(db: DbContext) -> Sequence[User]:
|
||||
"""Get all users."""
|
||||
return db(select(User)).scalars()
|
||||
|
||||
|
||||
def get_user_by_id(db: DbContext, user_id: int) -> Optional[User]:
|
||||
def from_id(db: DbContext, user_id: int) -> Optional[User]:
|
||||
"""
|
||||
Get a user by the user's id.
|
||||
Returns None if no user was found.
|
||||
|
@ -85,7 +80,7 @@ def get_user_by_id(db: DbContext, user_id: int) -> Optional[User]:
|
|||
return user
|
||||
|
||||
|
||||
def get_user_by_username(db: DbContext, username: str) -> Optional[User]:
|
||||
def from_username(db: DbContext, username: str) -> Optional[User]:
|
||||
"""
|
||||
Get a user by the user's username.
|
||||
Returns None if no user was found.
|
||||
|
@ -94,6 +89,11 @@ def get_user_by_username(db: DbContext, username: str) -> Optional[User]:
|
|||
return user
|
||||
|
||||
|
||||
def get_all(db: DbContext) -> Sequence[User]:
|
||||
"""Get all users."""
|
||||
return db(select(User)).scalars()
|
||||
|
||||
|
||||
def password_set(db: DbContext, username: str, new_password: str) -> None:
|
||||
"""Set a user's password."""
|
||||
password_hash = generate_password_hash(new_password)
|
||||
|
|
|
@ -1,5 +1,13 @@
|
|||
from argparse import BooleanOptionalAction
|
||||
import logging
|
||||
|
||||
from sqlalchemy import update
|
||||
|
||||
import amanuensis.backend.lexicon as lexiq
|
||||
import amanuensis.backend.membership as memq
|
||||
import amanuensis.backend.user as userq
|
||||
from amanuensis.db import DbContext, Lexicon
|
||||
|
||||
from .helpers import add_argument
|
||||
|
||||
|
||||
|
@ -9,22 +17,51 @@ COMMAND_HELP = "Interact with lexicons."
|
|||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@add_argument("lexicon")
|
||||
@add_argument("user")
|
||||
@add_argument("--editor", action="store_true")
|
||||
def command_add(args) -> int:
|
||||
db: DbContext = args.get_db()
|
||||
lexicon = lexiq.from_name(db, args.lexicon)
|
||||
user = userq.from_username(db, args.user)
|
||||
assert user is not None
|
||||
memq.create(db, user.id, lexicon.id, args.editor)
|
||||
LOG.info(f"Added {args.user} to lexicon {args.lexicon}")
|
||||
return 0
|
||||
|
||||
|
||||
@add_argument("name")
|
||||
def command_create(args):
|
||||
"""
|
||||
Create a lexicon.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
db: DbContext = args.get_db()
|
||||
lexiq.create(db, args.name, None, f"Prompt for Lexicon {args.name}")
|
||||
LOG.info(f"Created lexicon {args.name}")
|
||||
return 0
|
||||
|
||||
|
||||
def command_delete(args):
|
||||
@add_argument("name")
|
||||
@add_argument("--public", action=BooleanOptionalAction)
|
||||
@add_argument("--join", action=BooleanOptionalAction)
|
||||
def command_edit(args):
|
||||
"""
|
||||
Delete a lexicon.
|
||||
Update a lexicon's configuration.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
db: DbContext = args.get_db()
|
||||
values = {}
|
||||
|
||||
if args.public == True:
|
||||
values["public"] = True
|
||||
elif args.public == False:
|
||||
values["public"] = False
|
||||
|
||||
def command_list(args):
|
||||
"""
|
||||
List all lexicons and their statuses.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
if args.join == True:
|
||||
values["joinable"] = True
|
||||
elif args.join == False:
|
||||
values["joinable"] = False
|
||||
|
||||
result = db(update(Lexicon).where(Lexicon.name == args.name).values(**values))
|
||||
LOG.info(f"Updated {result.rowcount} lexicons")
|
||||
db.session.commit()
|
||||
return 0 if result.rowcount == 1 else -1
|
||||
|
|
|
@ -21,6 +21,7 @@ def command_create(args) -> int:
|
|||
db: DbContext = args.get_db()
|
||||
userq.create(db, args.username, "password", args.username, args.email, False)
|
||||
userq.password_set(db, args.username, args.password)
|
||||
LOG.info(f"Created user {args.username}")
|
||||
return 0
|
||||
|
||||
|
||||
|
@ -28,7 +29,7 @@ def command_create(args) -> int:
|
|||
def command_promote(args) -> int:
|
||||
"""Make a user a site admin."""
|
||||
db: DbContext = args.get_db()
|
||||
user: Optional[User] = userq.get_user_by_username(db, args.username)
|
||||
user: Optional[User] = userq.from_username(db, args.username)
|
||||
if user is None:
|
||||
args.parser.error("User not found")
|
||||
return -1
|
||||
|
@ -45,7 +46,7 @@ def command_promote(args) -> int:
|
|||
def command_demote(args):
|
||||
"""Revoke a user's site admin status."""
|
||||
db: DbContext = args.get_db()
|
||||
user: Optional[User] = userq.get_user_by_username(db, args.username)
|
||||
user: Optional[User] = userq.from_username(db, args.username)
|
||||
if user is None:
|
||||
args.parser.error("User not found")
|
||||
return -1
|
||||
|
|
|
@ -39,10 +39,10 @@ class DbContext:
|
|||
|
||||
if path and uri:
|
||||
raise ValueError("Only one of path and uri may be specified")
|
||||
db_uri = uri if uri else f"sqlite:///{os.path.abspath(path)}"
|
||||
self.db_uri = uri if uri else f"sqlite:///{os.path.abspath(path)}"
|
||||
|
||||
# Create an engine and enable foreign key constraints in sqlite
|
||||
self.engine = create_engine(db_uri, echo=echo)
|
||||
self.engine = create_engine(self.db_uri, echo=echo)
|
||||
|
||||
@event.listens_for(self.engine, "connect")
|
||||
def set_sqlite_pragma(dbapi_connection, connection_record):
|
||||
|
|
|
@ -4,6 +4,8 @@ import os
|
|||
|
||||
from flask import Flask, g
|
||||
|
||||
import amanuensis.backend.lexicon
|
||||
import amanuensis.backend.user
|
||||
from amanuensis.config import AmanuensisConfig, CommandLineConfig
|
||||
from amanuensis.db import DbContext
|
||||
import amanuensis.server.auth as auth
|
||||
|
@ -58,6 +60,11 @@ def get_app(
|
|||
|
||||
app.template_filter("date")(date_format)
|
||||
|
||||
def include_backend():
|
||||
return {"db": db, "lexiq": amanuensis.backend.lexicon, "userq": amanuensis.backend.user}
|
||||
|
||||
app.context_processor(include_backend)
|
||||
|
||||
# Set up Flask-Login
|
||||
auth.get_login_manager().init_app(app)
|
||||
|
||||
|
|
|
@ -39,7 +39,7 @@ def get_login_manager() -> LoginManager:
|
|||
user_id = int(user_id_str)
|
||||
except:
|
||||
return None
|
||||
return userq.get_user_by_id(g.db, user_id)
|
||||
return userq.from_id(g.db, user_id)
|
||||
|
||||
login_manager.user_loader(load_user)
|
||||
|
||||
|
@ -58,7 +58,7 @@ def login():
|
|||
# POST with valid data
|
||||
username: str = form.username.data
|
||||
password: str = form.password.data
|
||||
user: User = userq.get_user_by_username(g.db, username)
|
||||
user: User = userq.from_username(g.db, username)
|
||||
if not user or not userq.password_check(g.db, username, password):
|
||||
# Bad creds
|
||||
flash("Login not recognized")
|
||||
|
@ -69,11 +69,11 @@ def login():
|
|||
login_user(user, remember=remember_me)
|
||||
userq.update_logged_in(g.db, username)
|
||||
LOG.info("Logged in user {0.username} ({0.id})".format(user))
|
||||
return redirect(url_for("home.admin"))
|
||||
return redirect(url_for("home.home"))
|
||||
|
||||
|
||||
@bp.get("/logout/")
|
||||
@login_required
|
||||
def logout():
|
||||
logout_user()
|
||||
return redirect(url_for("home.admin"))
|
||||
return redirect(url_for("home.home"))
|
||||
|
|
|
@ -1,43 +1,23 @@
|
|||
from flask import Blueprint, render_template, g
|
||||
|
||||
# from flask import Blueprint, render_template, redirect, url_for, current_app
|
||||
# from flask_login import login_required, current_user
|
||||
|
||||
import amanuensis.backend.user as userq
|
||||
import amanuensis.backend.lexicon as lexiq
|
||||
|
||||
# from amanuensis.config import RootConfigDirectoryContext
|
||||
# from amanuensis.lexicon import create_lexicon, load_all_lexicons
|
||||
# from amanuensis.models import UserModel, ModelFactory
|
||||
# from amanuensis.server.helpers import admin_required
|
||||
|
||||
# from .forms import LexiconCreateForm
|
||||
|
||||
bp = Blueprint("home", __name__, url_prefix="/home", template_folder=".")
|
||||
|
||||
|
||||
# @bp.get("/")
|
||||
# def home():
|
||||
# Show lexicons that are visible to the current user
|
||||
# return "TODO"
|
||||
# user_lexicons = []
|
||||
# public_lexicons = []
|
||||
# for lexicon in load_all_lexicons(root):
|
||||
# if user.uid in lexicon.cfg.join.joined:
|
||||
# user_lexicons.append(lexicon)
|
||||
# elif lexicon.cfg.join.public:
|
||||
# public_lexicons.append(lexicon)
|
||||
# return render_template(
|
||||
# 'home.root.jinja',
|
||||
# user_lexicons=user_lexicons,
|
||||
# public_lexicons=public_lexicons)
|
||||
@bp.get("/")
|
||||
def home():
|
||||
return render_template('home.root.jinja')
|
||||
|
||||
|
||||
@bp.get("/admin/")
|
||||
# @login_required
|
||||
# @admin_required
|
||||
def admin():
|
||||
return render_template("home.admin.jinja", db=g.db, userq=userq, lexiq=lexiq)
|
||||
return render_template("home.admin.jinja", userq=userq, lexiq=lexiq)
|
||||
|
||||
|
||||
# @bp_home.route("/admin/create/", methods=['GET', 'POST'])
|
||||
|
|
|
@ -4,17 +4,17 @@
|
|||
{% block header %}<h2>Amanuensis - Admin Dashboard</h2>{% endblock %}
|
||||
|
||||
{# TODO #}
|
||||
{% block sb_home %}<a href="#{#{ url_for('home.home') }#}">Home</a>{% endblock %}
|
||||
{% block sb_home %}<a href="{{ url_for('home.home') }}">Home</a>{% endblock %}
|
||||
{% block sb_create %}<a href="#{#{ url_for('home.admin_create') }#}">Create a lexicon</a>{% endblock %}
|
||||
{% set template_sidebar_rows = [self.sb_home(), self.sb_create()] %}
|
||||
|
||||
{% block main %}
|
||||
<p>Users:</p>
|
||||
{% for user in userq.get_all_users(db) %}
|
||||
{% for user in userq.get_all(db) %}
|
||||
{{ macros.dashboard_user_item(user) }}
|
||||
{% endfor %}
|
||||
<p>Lexicons:</p>
|
||||
{% for lexicon in lexiq.get_all_lexicons(db) %}
|
||||
{% for lexicon in lexiq.get_all(db) %}
|
||||
{{ macros.dashboard_lexicon_item(lexicon) }}
|
||||
{% endfor %}
|
||||
{% endblock %}
|
||||
|
|
|
@ -11,10 +11,16 @@
|
|||
<span style="color:#ff0000">{{ message }}</span><br>
|
||||
{% endfor %}
|
||||
|
||||
{% if current_user.is_authenticated %}
|
||||
{% set joined = lexiq.get_joined(db, current_user.id)|list %}
|
||||
{% else %}
|
||||
{% set joined = [] %}
|
||||
{% endif %}
|
||||
|
||||
{% if current_user.is_authenticated %}
|
||||
<h2>Your games</h2>
|
||||
{% if user_lexicons %}
|
||||
{% for lexicon in user_lexicons %}
|
||||
{% if joined %}
|
||||
{% for lexicon in joined %}
|
||||
{{ macros.dashboard_lexicon_item(lexicon) }}
|
||||
{% endfor %}
|
||||
{% else %}
|
||||
|
@ -22,9 +28,10 @@
|
|||
{% endif %}
|
||||
{% endif %}
|
||||
|
||||
{% set public = lexiq.get_public(db)|reject("in", joined)|list %}
|
||||
<h2>Public games</h2>
|
||||
{% if public_lexicons %}
|
||||
{% for lexicon in public_lexicons %}
|
||||
{% if public %}
|
||||
{% for lexicon in public %}
|
||||
{{ macros.dashboard_lexicon_item(lexicon) }}
|
||||
{% endfor %}
|
||||
{% else %}
|
||||
|
@ -34,7 +41,7 @@
|
|||
{% endblock %}
|
||||
{% set template_content_blocks = [self.main()] %}
|
||||
|
||||
{% if current_user.cfg.is_admin %}
|
||||
{% if current_user.is_site_admin %}
|
||||
{% block admin_dash %}
|
||||
<a href="{{ url_for('home.admin') }}" style="display:block; text-align:center;">Admin dashboard</a>
|
||||
{% endblock %}
|
||||
|
|
|
@ -3,33 +3,35 @@
|
|||
<div class="dashboard-lexicon-item dashboard-lexicon-{{ status }}">
|
||||
<p>
|
||||
<span class="dashboard-lexicon-item-title">
|
||||
<a href="#{#{ url_for('lexicon.contents', name=lexicon.cfg.name) }#}">
|
||||
{{ lexicon.full_title }}</a>
|
||||
<a href="#{#{ url_for('lexicon.contents', name=lexicon.cfg.name) }#}">{{ lexicon.full_title }}</a>
|
||||
</span>
|
||||
[{{ lexicon.status.capitalize() }}]
|
||||
[{{ status.capitalize() }}]
|
||||
</p>
|
||||
<p><i>{{ lexicon.prompt }}</i></p>
|
||||
{% if current_user.is_authenticated %}
|
||||
<p>
|
||||
{# TODO #}
|
||||
{# {%
|
||||
if current_user.uid in lexicon.cfg.join.joined
|
||||
{#-
|
||||
Show detailed player information if the current user is a member of the lexicon or if the current user is a site admin. The filter sequence must be converted to a list because it returns a generator, which is truthy.
|
||||
-#}
|
||||
{%-
|
||||
if lexicon.memberships|map(attribute="user_id")|select("equalto", current_user.id)|list
|
||||
or current_user.is_site_admin
|
||||
%} #}
|
||||
Editor: {#{ lexicon.cfg.editor|user_attr('username') }#} /
|
||||
Players:
|
||||
{# {% for uid in lexicon.cfg.join.joined %} #}
|
||||
{# {{ uid|user_attr('username') }}{% if not loop.last %}, {% endif %} #}
|
||||
{# {% endfor %} #}
|
||||
{# ({{ lexicon.cfg.join.joined|count }}/{{ lexicon.cfg.join.max_players }}) #}
|
||||
{# {% else %} #}
|
||||
{# Players: {{ lexicon.cfg.join.joined|count }}/{{ lexicon.cfg.join.max_players }} #}
|
||||
{# {% if lexicon.cfg.join.public and lexicon.cfg.join.open %} #}
|
||||
{# / <a href="{{ url_for('lexicon.join', name=lexicon.cfg.name) }}"> #}
|
||||
{# Join game #}
|
||||
{# </a> #}
|
||||
{# {% endif %} #}
|
||||
{# {% endif %} #}
|
||||
-%}
|
||||
Editor: {{
|
||||
lexicon.memberships|selectattr("is_editor")|map(attribute="user")|map(attribute="username")|join(", ")
|
||||
}} / Players: {{
|
||||
lexicon.memberships|map(attribute="user")|map(attribute="username")|join(", ")
|
||||
}} ({{ lexicon.memberships|count }}
|
||||
{%- if lexicon.player_limit is not none -%}
|
||||
/{{ lexicon.player_limit }}
|
||||
{%- endif -%})
|
||||
{%- else -%}
|
||||
Players: {{ lexicon.memberships|count }}{% if lexicon.player_limit is not none %} / {{ lexicon.player_limit }}{% endif -%}
|
||||
{%-
|
||||
if lexicon.public and lexicon.joinable
|
||||
%} / <a href="#{#{ url_for('lexicon.join', name=lexicon.cfg.name) }#}">Join game</a>
|
||||
{%- endif -%}
|
||||
{%- endif -%}
|
||||
</p>
|
||||
{% endif %}
|
||||
</div>
|
||||
|
|
|
@ -1,22 +1,35 @@
|
|||
"""
|
||||
pytest test fixtures
|
||||
"""
|
||||
import os
|
||||
import pytest
|
||||
import tempfile
|
||||
|
||||
from sqlalchemy.orm.session import close_all_sessions
|
||||
|
||||
from amanuensis.db import DbContext
|
||||
import amanuensis.backend.character as charq
|
||||
import amanuensis.backend.lexicon as lexiq
|
||||
import amanuensis.backend.membership as memq
|
||||
import amanuensis.backend.user as userq
|
||||
from amanuensis.config import AmanuensisConfig
|
||||
from amanuensis.db import DbContext
|
||||
from amanuensis.server import get_app
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def db() -> DbContext:
|
||||
"""Provides an initialized database in memory."""
|
||||
db = DbContext(uri="sqlite:///:memory:", echo=False)
|
||||
def db(request) -> DbContext:
|
||||
"""Provides a fully-initialized ephemeral database."""
|
||||
db_fd, db_path = tempfile.mkstemp()
|
||||
db = DbContext(path=db_path, echo=False)
|
||||
db.create_all()
|
||||
|
||||
def db_teardown():
|
||||
close_all_sessions()
|
||||
os.close(db_fd)
|
||||
os.unlink(db_path)
|
||||
|
||||
request.addfinalizer(db_teardown)
|
||||
|
||||
return db
|
||||
|
||||
|
||||
|
@ -128,12 +141,10 @@ def lexicon_with_editor(make):
|
|||
|
||||
class TestConfig(AmanuensisConfig):
|
||||
TESTING = True
|
||||
SECRET_KEY = "secret key"
|
||||
DATABASE_URI = "sqlite:///:memory:"
|
||||
SECRET_KEY = os.urandom(32).hex()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def app(db):
|
||||
def app(db: DbContext):
|
||||
"""Provides an application running on top of the test database."""
|
||||
server_app = get_app(TestConfig, db)
|
||||
return server_app
|
||||
return get_app(TestConfig(), db)
|
||||
|
|
Loading…
Reference in New Issue