Reintegrate lexicon routes and joining

This commit is contained in:
Tim Van Baak 2021-06-29 00:35:23 -07:00
parent 587a70faf5
commit 4401024bf5
15 changed files with 132 additions and 175 deletions

View File

@ -5,7 +5,8 @@ Lexicon query interface
import re
from typing import Sequence, Optional
from sqlalchemy import select, func
from sqlalchemy import select, func, update
from werkzeug.security import generate_password_hash, check_password_hash
from amanuensis.db import DbContext, Lexicon, Membership
from amanuensis.errors import ArgumentError, BackendArgumentTypeError
@ -72,6 +73,21 @@ def get_public(db: DbContext) -> Sequence[Lexicon]:
return db(select(Lexicon).where(Lexicon.public == True)).scalars()
def password_check(db: DbContext, lexicon_id: int, password: str) -> bool:
"""Check if a password is correct."""
password_hash: str = db(
select(Lexicon.join_password).where(Lexicon.id == lexicon_id)
).scalar_one()
return check_password_hash(password_hash, password)
def password_set(db: DbContext, lexicon_id: int, new_password: Optional[str]) -> None:
"""Set or clear a lexicon's password."""
password_hash = generate_password_hash(new_password) if new_password else None
db(update(Lexicon).where(Lexicon.id == lexicon_id).values(password=password_hash))
db.session.commit()
def try_from_name(db: DbContext, name: str) -> Optional[Lexicon]:
"""Get a lexicon by its name, or None if no such lexicon was found."""
return db(select(Lexicon).where(Lexicon.name == name)).scalar_one_or_none()

View File

@ -76,13 +76,6 @@ def get_all(db: DbContext) -> Sequence[User]:
return db(select(User)).scalars()
def password_set(db: DbContext, username: str, new_password: str) -> None:
"""Set a user's password."""
password_hash = generate_password_hash(new_password)
db(update(User).where(User.username == username).values(password=password_hash))
db.session.commit()
def password_check(db: DbContext, username: str, password: str) -> bool:
"""Check if a password is correct."""
user_password_hash: str = db(
@ -91,6 +84,13 @@ def password_check(db: DbContext, username: str, password: str) -> bool:
return check_password_hash(user_password_hash, password)
def password_set(db: DbContext, username: str, new_password: str) -> None:
"""Set a user's password."""
password_hash = generate_password_hash(new_password)
db(update(User).where(User.username == username).values(password=password_hash))
db.session.commit()
def try_from_id(db: DbContext, user_id: int) -> Optional[User]:
"""Get a user by the user's id, or None is no such user was found."""
return db(select(User).where(User.id == user_id)).scalar_one_or_none()

View File

@ -11,58 +11,6 @@ from amanuensis.models import LexiconModel, UserModel
from amanuensis.resources import get_stream
def player_can_join_lexicon(
player: UserModel,
lexicon: LexiconModel,
password: str = None) -> bool:
"""
Checks whether the given player can join a lexicon
"""
# Trivial failures
if lexicon is None:
return False
if player is None:
return False
# Can't join if already in the game
if player.uid in lexicon.cfg.join.joined:
return False
# Can't join if the game is closed
if not lexicon.cfg.join.open:
return False
# Can't join if there's no room left
if len(lexicon.cfg.join.joined) >= lexicon.cfg.join.max_players:
return False
# Can't join if the password doesn't check out
if (lexicon.cfg.join.password is not None
and lexicon.cfg.join.password != password):
return False
return True
def add_player_to_lexicon(
player: UserModel,
lexicon: LexiconModel) -> None:
"""
Unconditionally adds a player to a lexicon
"""
# Verify arguments
if lexicon is None:
raise ArgumentError(f'Invalid lexicon: {lexicon}')
if player is None:
raise ArgumentError(f'Invalid player: {player}')
# Idempotently add player
added = False
with lexicon.ctx.edit_config() as cfg:
if player.uid not in cfg.join.joined:
cfg.join.joined.append(player.uid)
added = True
# Log to the lexicon's log
if added:
lexicon.log('Player "{0.cfg.username}" joined ({0.uid})'.format(player))
def player_can_create_character(
player: UserModel,
lexicon: LexiconModel,

View File

@ -2,7 +2,7 @@ from datetime import datetime, timezone
import json
import os
from flask import Flask, g, url_for
from flask import Flask, g, url_for, redirect
from amanuensis.backend import lexiq, userq, memq
from amanuensis.config import AmanuensisConfig, CommandLineConfig
@ -10,6 +10,7 @@ from amanuensis.db import DbContext
from amanuensis.parser import filesafe_title
import amanuensis.server.auth as auth
import amanuensis.server.home as home
import amanuensis.server.lexicon as lexicon
def date_format(dt: datetime, formatstr="%Y-%m-%d %H:%M:%S%z") -> str:
@ -80,11 +81,13 @@ def get_app(
# Register blueprints
app.register_blueprint(auth.bp)
app.register_blueprint(home.bp)
app.register_blueprint(lexicon.bp)
def test():
return "Hello, world!"
# Add a root redirect
def root():
return redirect(url_for("home.home"))
app.route("/")(test)
app.route("/")(root)
return app

View File

@ -15,8 +15,9 @@ def lexicon_param(route):
"""
@wraps(route)
def with_lexicon(*args, **kwargs):
db: DbContext = g.db
name: str = kwargs.get('name')
lexicon: Optional[Lexicon] = lexiq.try_from_name(name)
lexicon: Optional[Lexicon] = lexiq.try_from_name(db, name)
if lexicon is None:
flash(f"Couldn't find a lexicon with the name \"{name}\"")
return redirect(url_for("home.home"))

View File

@ -1,9 +1,9 @@
{% extends "page_2col.jinja" %}
{% set lexicon_title = g.lexicon.title %}
{% set lexicon_title = g.lexicon.full_title %}
{% block header %}
<h2>{{ lexicon_title }}</h2>
<p><i>{{ g.lexicon.cfg.prompt }}</i></p>
<p><i>{{ g.lexicon.prompt }}</i></p>
{% endblock %}
{% block sb_logo %}{% endblock %}
@ -11,22 +11,22 @@
{% endblock %}
{% block sb_contents %}<a
{% if current_page == "contents" %}class="current-page"
{% else %}href="{{ url_for('lexicon.contents', name=g.lexicon.cfg.name) }}"
{% else %}href="{{ url_for('lexicon.contents', name=g.lexicon.name) }}"
{% endif %}>Contents</a>{% endblock %}
{% block sb_rules %}<a
{% if current_page == "rules" %}class="current-page"
{% else %}href="{{ url_for('lexicon.rules', name=g.lexicon.cfg.name) }}"
{% else %}href="{{ url_for('lexicon.rules', name=g.lexicon.name) }}"
{% endif %}>Rules</a>{% endblock %}
{% block sb_session %}<a
{% if current_page == "session" %}class="current-page"
{% else %}href="{{ url_for('session.session', name=g.lexicon.cfg.name) }}"
{% else %}href="#{#{ url_for('session.session', name=g.lexicon.name) }#}"
{% endif %}>Session</a>{% endblock %}
{% block sb_stats %}<a
{% if current_page == "statistics" %}class="current-page"
{% else %}href="{{ url_for('lexicon.stats', name=g.lexicon.cfg.name) }}"
{% else %}href="{{ url_for('lexicon.stats', name=g.lexicon.name) }}"
{% endif %}>Statistics</a>{% endblock %}
{% if current_user.uid in g.lexicon.cfg.join.joined %}
{% if current_user.is_authenticated and memq.try_from_ids(g.db, current_user.id, g.lexicon.id) %}
{# self.sb_logo(), #}
{% set template_sidebar_rows = [
self.sb_home(),

View File

@ -1,96 +1,82 @@
from flask import (
Blueprint,
flash,
redirect,
url_for,
g,
render_template,
Markup)
from flask import Blueprint, flash, redirect, url_for, g, render_template, Markup
from flask_login import login_required, current_user
from amanuensis.lexicon import (
player_can_join_lexicon,
add_player_to_lexicon,
sort_by_index_spec)
from amanuensis.models import LexiconModel
from amanuensis.server.helpers import (
lexicon_param,
player_required_if_not_public)
from amanuensis.backend import lexiq, memq
from amanuensis.db import DbContext, Lexicon, User
from amanuensis.errors import ArgumentError
from amanuensis.server.helpers import lexicon_param, player_required_if_not_public
from .forms import LexiconJoinForm
bp_lexicon = Blueprint('lexicon', __name__,
url_prefix='/lexicon/<name>',
template_folder='.')
bp = Blueprint("lexicon", __name__, url_prefix="/lexicon/<name>", template_folder=".")
@bp_lexicon.route("/join/", methods=['GET', 'POST'])
@bp.route("/join/", methods=["GET", "POST"])
@lexicon_param
@login_required
def join(name):
if g.lexicon.status != LexiconModel.PREGAME:
flash("Can't join a game already in progress")
return redirect(url_for('home.home'))
if not g.lexicon.cfg.join.open:
lexicon: Lexicon = g.lexicon
if not lexicon.joinable:
flash("This game isn't open for joining")
return redirect(url_for('home.home'))
return redirect(url_for("home.home"))
form = LexiconJoinForm()
if not form.validate_on_submit():
# GET or POST with invalid form data
return render_template('lexicon.join.jinja', form=form)
return render_template("lexicon.join.jinja", form=form)
# POST with valid data
# If the game is passworded, check password
if (g.lexicon.cfg.join.password
and form.password.data != g.lexicon.cfg.join.password):
db: DbContext = g.db
if lexicon.join_password and not lexiq.password_check(
db, lexicon.id, form.password.data
):
# Bad creds, try again
flash('Incorrect password')
return redirect(url_for('lexicon.join', name=name))
flash("Incorrect password")
return redirect(url_for("lexicon.join", name=name))
# If the password was correct, check if the user can join
if player_can_join_lexicon(current_user, g.lexicon, form.password.data):
add_player_to_lexicon(current_user, g.lexicon)
return redirect(url_for('session.session', name=name))
else:
flash('Could not join game')
return redirect(url_for('home.home', name=name))
user: User = current_user
try:
memq.create(db, user.id, lexicon.id, is_editor=False)
return redirect(url_for("session.session", name=name))
except ArgumentError:
flash("Could not join game")
return redirect(url_for("home.home", name=name))
@bp_lexicon.route('/contents/', methods=['GET'])
@bp.get("/contents/")
@lexicon_param
@player_required_if_not_public
def contents(name):
with g.lexicon.ctx.read('info') as info:
indexed = sort_by_index_spec(info, g.lexicon.cfg.article.index.list)
for articles in indexed.values():
for i in range(len(articles)):
articles[i] = {
'title': articles[i],
**info.get(articles[i])}
return render_template('lexicon.contents.jinja', indexed=indexed)
# indexed = sort_by_index_spec(info, g.lexicon.cfg.article.index.list)
# for articles in indexed.values():
# for i in range(len(articles)):
# articles[i] = {
# 'title': articles[i],
# **info.get(articles[i])}
return render_template("lexicon.contents.jinja")
@bp_lexicon.route('/article/<title>')
@bp.get("/article/<title>")
@lexicon_param
@player_required_if_not_public
def article(name, title):
with g.lexicon.ctx.article.read(title) as a:
article = {**a, 'html': Markup(a['html'])}
return render_template('lexicon.article.jinja', article=article)
# article = {**a, 'html': Markup(a['html'])}
return render_template("lexicon.article.jinja")
@bp_lexicon.route('/rules/', methods=['GET'])
@bp.get("/rules/")
@lexicon_param
@player_required_if_not_public
def rules(name):
return render_template('lexicon.rules.jinja')
return render_template("lexicon.rules.jinja")
@bp_lexicon.route('/statistics/', methods=['GET'])
@bp.get("/statistics/")
@lexicon_param
@player_required_if_not_public
def stats(name):
return render_template('lexicon.statistics.jinja')
return render_template("lexicon.statistics.jinja")

View File

@ -4,5 +4,6 @@ from wtforms import StringField, SubmitField
class LexiconJoinForm(FlaskForm):
"""/lexicon/<name>/join/"""
password = StringField('Password')
submit = SubmitField('Submit')
password = StringField("Password")
submit = SubmitField("Submit")

View File

@ -1,12 +1,14 @@
{% extends "lexicon.jinja" %}
{% block title %}Join | {{ lexicon_title }}{% endblock %}
{% block title %}Join | {{ g.lexicon.full_title }}{% endblock %}
{% block main %}
<form id="lexicon-join" action="" method="post" novalidate>
{{ form.hidden_tag() }}
{% if g.lexicon.cfg.join.password %}
{% if g.lexicon.join_password %}
<p>{{ form.password.label }}<br>{{ form.password(size=32) }}</p>
{% else %}
<p>Join {{ g.lexicon.full_title }}?</p>
{% endif %}
<p>{{ form.submit() }}</p>
</form>

View File

@ -3,7 +3,7 @@
<div class="dashboard-lexicon-item dashboard-lexicon-{{ status }}">
<p>
<span class="dashboard-lexicon-item-title">
<a href="#{#{ url_for('lexicon.contents', name=lexicon.cfg.name) }#}">{{ lexicon.full_title }}</a>
<a href="{{ url_for('lexicon.contents', name=lexicon.name) }}">{{ lexicon.full_title }}</a>
</span>
[{{ status.capitalize() }}]
</p>
@ -29,7 +29,7 @@
Players: {{ lexicon.memberships|count }}{% if lexicon.player_limit is not none %} / {{ lexicon.player_limit }}{% endif -%}
{%-
if lexicon.public and lexicon.joinable
%} / <a href="#{#{ url_for('lexicon.join', name=lexicon.cfg.name) }#}">Join game</a>
%} / <a href="{{ url_for('lexicon.join', name=lexicon.name) }}">Join game</a>
{%- endif -%}
{%- endif -%}
</p>

View File

@ -1,4 +1,4 @@
[mypy]
ignore_missing_imports = true
exclude = "|amanuensis/lexicon/.*|amanuensis/server/.*|amanuensis/server/lexicon/.*|amanuensis/server/session/.*|"
exclude = "|amanuensis/lexicon/.*|amanuensis/server/.*|amanuensis/server/session/.*|"
; mypy stable doesn't support pyproject.toml yet

View File

@ -22,11 +22,11 @@ amanuensis-cli = "amanuensis.cli:main"
amanuensis-server = "amanuensis.server:run"
[tool.black]
extend-exclude = "^/amanuensis/lexicon/.*|^/amanuensis/server/[^/]*py|^/amanuensis/server/lexicon/.*|^/amanuensis/server/session/.*|"
extend-exclude = "^/amanuensis/lexicon/.*|^/amanuensis/server/[^/]*py|^/amanuensis/server/session/.*|"
[tool.mypy]
ignore_missing_imports = true
exclude = "|amanuensis/lexicon/.*|amanuensis/server/.*|amanuensis/server/lexicon/.*|amanuensis/server/session/.*|"
exclude = "|amanuensis/lexicon/.*|amanuensis/server/.*|amanuensis/server/session/.*|"
[tool.pytest.ini_options]
addopts = "--show-capture=log"

View File

@ -9,5 +9,5 @@ def test_app_testing(app: Flask):
def test_client(app: Flask):
"""Test that the test client works."""
with app.test_client() as client:
response = client.get("/")
assert b"world" in response.data
response = client.get("/home/")
assert b"Amanuensis" in response.data