Compare commits

...

4 Commits

18 changed files with 422 additions and 468 deletions

View File

@ -5,7 +5,8 @@ Lexicon query interface
import re import re
from typing import Sequence, Optional from typing import Sequence, Optional
from sqlalchemy import select, func from sqlalchemy import select, func, update
from werkzeug.security import generate_password_hash, check_password_hash
from amanuensis.db import DbContext, Lexicon, Membership from amanuensis.db import DbContext, Lexicon, Membership
from amanuensis.errors import ArgumentError, BackendArgumentTypeError from amanuensis.errors import ArgumentError, BackendArgumentTypeError
@ -72,6 +73,21 @@ def get_public(db: DbContext) -> Sequence[Lexicon]:
return db(select(Lexicon).where(Lexicon.public == True)).scalars() return db(select(Lexicon).where(Lexicon.public == True)).scalars()
def password_check(db: DbContext, lexicon_id: int, password: str) -> bool:
"""Check if a password is correct."""
password_hash: str = db(
select(Lexicon.join_password).where(Lexicon.id == lexicon_id)
).scalar_one()
return check_password_hash(password_hash, password)
def password_set(db: DbContext, lexicon_id: int, new_password: Optional[str]) -> None:
"""Set or clear a lexicon's password."""
password_hash = generate_password_hash(new_password) if new_password else None
db(update(Lexicon).where(Lexicon.id == lexicon_id).values(password=password_hash))
db.session.commit()
def try_from_name(db: DbContext, name: str) -> Optional[Lexicon]: def try_from_name(db: DbContext, name: str) -> Optional[Lexicon]:
"""Get a lexicon by its name, or None if no such lexicon was found.""" """Get a lexicon by its name, or None if no such lexicon was found."""
return db(select(Lexicon).where(Lexicon.name == name)).scalar_one_or_none() return db(select(Lexicon).where(Lexicon.name == name)).scalar_one_or_none()

View File

@ -64,3 +64,12 @@ def create(
db.session.add(new_membership) db.session.add(new_membership)
db.session.commit() db.session.commit()
return new_membership return new_membership
def try_from_ids(db: DbContext, user_id: int, lexicon_id: int) -> Membership:
"""Get a membership by the user and lexicon ids, or None if no such membership was found."""
return db(
select(Membership)
.where(Membership.user_id == user_id)
.where(Membership.lexicon_id == lexicon_id)
).scalar_one_or_none()

View File

@ -76,13 +76,6 @@ def get_all(db: DbContext) -> Sequence[User]:
return db(select(User)).scalars() return db(select(User)).scalars()
def password_set(db: DbContext, username: str, new_password: str) -> None:
"""Set a user's password."""
password_hash = generate_password_hash(new_password)
db(update(User).where(User.username == username).values(password=password_hash))
db.session.commit()
def password_check(db: DbContext, username: str, password: str) -> bool: def password_check(db: DbContext, username: str, password: str) -> bool:
"""Check if a password is correct.""" """Check if a password is correct."""
user_password_hash: str = db( user_password_hash: str = db(
@ -91,6 +84,13 @@ def password_check(db: DbContext, username: str, password: str) -> bool:
return check_password_hash(user_password_hash, password) return check_password_hash(user_password_hash, password)
def password_set(db: DbContext, username: str, new_password: str) -> None:
"""Set a user's password."""
password_hash = generate_password_hash(new_password)
db(update(User).where(User.username == username).values(password=password_hash))
db.session.commit()
def try_from_id(db: DbContext, user_id: int) -> Optional[User]: def try_from_id(db: DbContext, user_id: int) -> Optional[User]:
"""Get a user by the user's id, or None is no such user was found.""" """Get a user by the user's id, or None is no such user was found."""
return db(select(User).where(User.id == user_id)).scalar_one_or_none() return db(select(User).where(User.id == user_id)).scalar_one_or_none()

View File

@ -16,6 +16,7 @@ class BackendArgumentTypeError(ArgumentError):
A call to a backend function was made with a value of an invalid type for the parameter. A call to a backend function was made with a value of an invalid type for the parameter.
Specify the invalid parameter and value as a kwarg. Specify the invalid parameter and value as a kwarg.
""" """
def __init__(self, obj_type, **kwarg): def __init__(self, obj_type, **kwarg):
if not kwarg: if not kwarg:
raise ValueError("Missing kwarg") raise ValueError("Missing kwarg")

View File

@ -11,58 +11,6 @@ from amanuensis.models import LexiconModel, UserModel
from amanuensis.resources import get_stream from amanuensis.resources import get_stream
def player_can_join_lexicon(
player: UserModel,
lexicon: LexiconModel,
password: str = None) -> bool:
"""
Checks whether the given player can join a lexicon
"""
# Trivial failures
if lexicon is None:
return False
if player is None:
return False
# Can't join if already in the game
if player.uid in lexicon.cfg.join.joined:
return False
# Can't join if the game is closed
if not lexicon.cfg.join.open:
return False
# Can't join if there's no room left
if len(lexicon.cfg.join.joined) >= lexicon.cfg.join.max_players:
return False
# Can't join if the password doesn't check out
if (lexicon.cfg.join.password is not None
and lexicon.cfg.join.password != password):
return False
return True
def add_player_to_lexicon(
player: UserModel,
lexicon: LexiconModel) -> None:
"""
Unconditionally adds a player to a lexicon
"""
# Verify arguments
if lexicon is None:
raise ArgumentError(f'Invalid lexicon: {lexicon}')
if player is None:
raise ArgumentError(f'Invalid player: {player}')
# Idempotently add player
added = False
with lexicon.ctx.edit_config() as cfg:
if player.uid not in cfg.join.joined:
cfg.join.joined.append(player.uid)
added = True
# Log to the lexicon's log
if added:
lexicon.log('Player "{0.cfg.username}" joined ({0.uid})'.format(player))
def player_can_create_character( def player_can_create_character(
player: UserModel, player: UserModel,
lexicon: LexiconModel, lexicon: LexiconModel,

View File

@ -2,7 +2,7 @@ from datetime import datetime, timezone
import json import json
import os import os
from flask import Flask, g, url_for from flask import Flask, g, url_for, redirect
from amanuensis.backend import lexiq, userq, memq from amanuensis.backend import lexiq, userq, memq
from amanuensis.config import AmanuensisConfig, CommandLineConfig from amanuensis.config import AmanuensisConfig, CommandLineConfig
@ -10,6 +10,8 @@ from amanuensis.db import DbContext
from amanuensis.parser import filesafe_title from amanuensis.parser import filesafe_title
import amanuensis.server.auth as auth import amanuensis.server.auth as auth
import amanuensis.server.home as home import amanuensis.server.home as home
import amanuensis.server.lexicon as lexicon
import amanuensis.server.session as session
def date_format(dt: datetime, formatstr="%Y-%m-%d %H:%M:%S%z") -> str: def date_format(dt: datetime, formatstr="%Y-%m-%d %H:%M:%S%z") -> str:
@ -80,11 +82,14 @@ def get_app(
# Register blueprints # Register blueprints
app.register_blueprint(auth.bp) app.register_blueprint(auth.bp)
app.register_blueprint(home.bp) app.register_blueprint(home.bp)
app.register_blueprint(lexicon.bp)
app.register_blueprint(session.bp)
def test(): # Add a root redirect
return "Hello, world!" def root():
return redirect(url_for("home.home"))
app.route("/")(test) app.route("/")(root)
return app return app

View File

@ -1,107 +1,95 @@
# Standard library imports
from datetime import datetime
from functools import wraps from functools import wraps
from typing import Optional
# Third party imports from flask import g, flash, redirect, url_for
from flask import g, flash, redirect, url_for, current_app
from flask_login import current_user from flask_login import current_user
# Module imports from amanuensis.backend import lexiq, memq
from amanuensis.parser import filesafe_title from amanuensis.db import DbContext, Lexicon, User, Membership
from amanuensis.models import ModelFactory, UserModel, LexiconModel
def register_custom_filters(app):
"""Adds custom filters to the Flask app"""
@app.template_filter("user_attr")
def get_user_attr(uid, attr):
factory: ModelFactory = current_app.config['model_factory']
user: UserModel = factory.user(uid)
val = getattr(user.cfg, attr)
return val
@app.template_filter("articlelink")
def article_link(title):
return url_for(
'lexicon.article',
name=g.lexicon.cfg.name,
title=filesafe_title(title))
@app.context_processor
def lexicon_status():
return dict(
PREGAME=LexiconModel.PREGAME,
ONGOING=LexiconModel.ONGOING,
COMPLETE=LexiconModel.COMPLETE)
def lexicon_param(route): def lexicon_param(route):
"""Wrapper for loading a route's lexicon""" """
@wraps(route) Wrapper for loading a route's lexicon to `g`.
def with_lexicon(**kwargs): This decorator should be applied above any other decorators that reference `g.lexicon`.
name = kwargs.get('name') """
model_factory: ModelFactory = current_app.config['model_factory'] @wraps(route)
g.lexicon = model_factory.lexicon(name) def with_lexicon(*args, **kwargs):
if g.lexicon is None: db: DbContext = g.db
flash(f'Couldn\'t find a lexicon with the name "{name}"') name: str = kwargs.get('name')
return redirect(url_for("home.home")) lexicon: Optional[Lexicon] = lexiq.try_from_name(db, name)
return route(**kwargs) if lexicon is None:
return with_lexicon flash(f"Couldn't find a lexicon with the name \"{name}\"")
return redirect(url_for("home.home"))
g.lexicon = lexicon
return route(*args, **kwargs)
return with_lexicon
def admin_required(route): def admin_required(route):
""" """
Requires the user to be an admin to load this page Restricts a route to users who are site admins.
""" """
@wraps(route) @wraps(route)
def admin_route(*args, **kwargs): def admin_route(*args, **kwargs):
if not current_user.cfg.is_admin: user: User = current_user
flash("You must be an admin to view this page") if not user.is_site_admin:
return redirect(url_for('home.home')) flash("You must be an admin to view this page")
return route(*args, **kwargs) return redirect(url_for('home.home'))
return admin_route return route(*args, **kwargs)
return admin_route
def player_required(route): def player_required(route):
""" """
Requires the user to be a player in the lexicon to load this page Restricts a route to users who are players in the current lexicon.
""" """
@wraps(route) @wraps(route)
def player_route(*args, **kwargs): def player_route(*args, **kwargs):
if current_user.uid not in g.lexicon.cfg.join.joined: db: DbContext = g.db
flash("You must be a player to view this page") user: User = current_user
return (redirect(url_for('lexicon.contents', name=g.lexicon.cfg.name)) lexicon: Lexicon = g.lexicon
if g.lexicon.cfg.join.public mem: Optional[Membership] = memq.try_from_ids(db, user.id, lexicon.id)
else redirect(url_for('home.home'))) if not mem:
return route(*args, **kwargs) flash("You must be a player to view this page")
return player_route if lexicon.public:
return redirect(url_for('lexicon.contents', name=lexicon.name))
else:
return redirect(url_for('home.home'))
return route(*args, **kwargs)
return player_route
def player_required_if_not_public(route): def player_required_if_not_public(route):
""" """
Requires the user to be a player in the lexicon to load this page if the Restricts a route to users who are players in the current lexicon if the lexicon is nonpublic.
lexicon has join.public = false """
""" @wraps(route)
@wraps(route) def player_route(*args, **kwargs):
def player_route(*args, **kwargs): db: DbContext = g.db
if ((not g.lexicon.cfg.join.public) user: User = current_user
and current_user.uid not in g.lexicon.cfg.join.joined): lexicon: Lexicon = g.lexicon
flash("You must be a player to view this page") if not lexicon.public:
return redirect(url_for('home.home')) mem: Optional[Membership] = memq.try_from_ids(db, user.id, lexicon.id)
return route(*args, **kwargs) if not mem:
return player_route flash("You must be a player to view this page")
return redirect(url_for('home.home'))
return route(*args, **kwargs)
return player_route
def editor_required(route): def editor_required(route):
""" """
Requires the user to be the editor of the current lexicon to load this Restricts a route to users who are editors of the current lexicon.
page """
""" @wraps(route)
@wraps(route) def editor_route(*args, **kwargs):
def editor_route(*args, **kwargs): db: DbContext = g.db
if current_user.uid != g.lexicon.cfg.editor: user: User = current_user
flash("You must be the editor to view this page") lexicon: Lexicon = g.lexicon
return redirect(url_for('lexicon.contents', name=g.lexicon.cfg.name)) mem: Optional[Membership] = memq.try_from_ids(db, user.id, lexicon.id)
return route(*args, **kwargs) if not mem.is_editor:
return editor_route flash("You must be the editor to view this page")
return redirect(url_for('lexicon.contents', name=lexicon.name))
return route(*args, **kwargs)
return editor_route

View File

@ -1,44 +1,47 @@
{% extends "page_2col.jinja" %} {% extends "page_2col.jinja" %}
{% set lexicon_title = g.lexicon.title %} {% set lexicon_title = g.lexicon.full_title %}
{% block header %} {% block header %}
<h2>{{ lexicon_title }}</h2> <h2>{{ lexicon_title }}</h2>
<p><i>{{ g.lexicon.cfg.prompt }}</i></p> <p><i>{{ g.lexicon.prompt }}</i></p>
{% endblock %} {% endblock %}
{% block sb_logo %}{% endblock %} {% block sb_logo %}{% endblock %}
{% block sb_home %}<a href="{{ url_for('home.home') }}">Home</a> {% block sb_home %}<a href="{{ url_for('home.home') }}">Home</a>
{% endblock %} {% endblock %}
{% block sb_contents %}<a {% block sb_contents %}<a
{% if current_page == "contents" %}class="current-page" {% if current_page == "contents" %}class="current-page"
{% else %}href="{{ url_for('lexicon.contents', name=g.lexicon.cfg.name) }}" {% else %}href="{{ url_for('lexicon.contents', name=g.lexicon.name) }}"
{% endif %}>Contents</a>{% endblock %} {% endif %}>Contents</a>{% endblock %}
{% block sb_rules %}<a {% block sb_rules %}<a
{% if current_page == "rules" %}class="current-page" {% if current_page == "rules" %}class="current-page"
{% else %}href="{{ url_for('lexicon.rules', name=g.lexicon.cfg.name) }}" {% else %}href="{{ url_for('lexicon.rules', name=g.lexicon.name) }}"
{% endif %}>Rules</a>{% endblock %} {% endif %}>Rules</a>{% endblock %}
{% block sb_session %}<a {% block sb_session %}<a
{% if current_page == "session" %}class="current-page" {% if current_page == "session" %}class="current-page"
{% else %}href="{{ url_for('session.session', name=g.lexicon.cfg.name) }}" {% else %}href="#{#{ url_for('session.session', name=g.lexicon.name) }#}"
{% endif %}>Session</a>{% endblock %} {% endif %}>Session</a>{% endblock %}
{% block sb_stats %}<a {% block sb_stats %}<a
{% if current_page == "statistics" %}class="current-page" {% if current_page == "statistics" %}class="current-page"
{% else %}href="{{ url_for('lexicon.stats', name=g.lexicon.cfg.name) }}" {% else %}href="{{ url_for('lexicon.stats', name=g.lexicon.name) }}"
{% endif %}>Statistics</a>{% endblock %} {% endif %}>Statistics</a>{% endblock %}
{% if current_user.uid in g.lexicon.cfg.join.joined %} {% if current_user.is_authenticated and (
{# self.sb_logo(), #} current_user.is_site_admin
or memq.try_from_ids(g.db, current_user.id, g.lexicon.id)
) %}
{# self.sb_logo(), #}
{% set template_sidebar_rows = [ {% set template_sidebar_rows = [
self.sb_home(), self.sb_home(),
self.sb_contents(), self.sb_contents(),
self.sb_rules(), self.sb_rules(),
self.sb_session(), self.sb_session(),
self.sb_stats()] %} self.sb_stats()] %}
{% else %} {% else %}
{# self.sb_logo(), #} {# self.sb_logo(), #}
{% set template_sidebar_rows = [ {% set template_sidebar_rows = [
self.sb_home(), self.sb_home(),
self.sb_contents(), self.sb_contents(),
self.sb_rules(), self.sb_rules(),
self.sb_stats()] %} self.sb_stats()] %}
{% endif %} {% endif %}

View File

@ -1,96 +1,82 @@
from flask import ( from flask import Blueprint, flash, redirect, url_for, g, render_template, Markup
Blueprint,
flash,
redirect,
url_for,
g,
render_template,
Markup)
from flask_login import login_required, current_user from flask_login import login_required, current_user
from amanuensis.lexicon import ( from amanuensis.backend import lexiq, memq
player_can_join_lexicon, from amanuensis.db import DbContext, Lexicon, User
add_player_to_lexicon, from amanuensis.errors import ArgumentError
sort_by_index_spec) from amanuensis.server.helpers import lexicon_param, player_required_if_not_public
from amanuensis.models import LexiconModel
from amanuensis.server.helpers import (
lexicon_param,
player_required_if_not_public)
from .forms import LexiconJoinForm from .forms import LexiconJoinForm
bp_lexicon = Blueprint('lexicon', __name__, bp = Blueprint("lexicon", __name__, url_prefix="/lexicon/<name>", template_folder=".")
url_prefix='/lexicon/<name>',
template_folder='.')
@bp_lexicon.route("/join/", methods=['GET', 'POST']) @bp.route("/join/", methods=["GET", "POST"])
@lexicon_param @lexicon_param
@login_required @login_required
def join(name): def join(name):
if g.lexicon.status != LexiconModel.PREGAME: lexicon: Lexicon = g.lexicon
flash("Can't join a game already in progress") if not lexicon.joinable:
return redirect(url_for('home.home')) flash("This game isn't open for joining")
return redirect(url_for("home.home"))
if not g.lexicon.cfg.join.open: form = LexiconJoinForm()
flash("This game isn't open for joining")
return redirect(url_for('home.home'))
form = LexiconJoinForm() if not form.validate_on_submit():
# GET or POST with invalid form data
return render_template("lexicon.join.jinja", form=form)
if not form.validate_on_submit(): # POST with valid data
# GET or POST with invalid form data # If the game is passworded, check password
return render_template('lexicon.join.jinja', form=form) db: DbContext = g.db
if lexicon.join_password and not lexiq.password_check(
db, lexicon.id, form.password.data
):
# Bad creds, try again
flash("Incorrect password")
return redirect(url_for("lexicon.join", name=name))
# POST with valid data # If the password was correct, check if the user can join
# If the game is passworded, check password user: User = current_user
if (g.lexicon.cfg.join.password try:
and form.password.data != g.lexicon.cfg.join.password): memq.create(db, user.id, lexicon.id, is_editor=False)
# Bad creds, try again return redirect(url_for("session.session", name=name))
flash('Incorrect password') except ArgumentError:
return redirect(url_for('lexicon.join', name=name)) flash("Could not join game")
# If the password was correct, check if the user can join return redirect(url_for("home.home", name=name))
if player_can_join_lexicon(current_user, g.lexicon, form.password.data):
add_player_to_lexicon(current_user, g.lexicon)
return redirect(url_for('session.session', name=name))
else:
flash('Could not join game')
return redirect(url_for('home.home', name=name))
@bp_lexicon.route('/contents/', methods=['GET']) @bp.get("/contents/")
@lexicon_param @lexicon_param
@player_required_if_not_public @player_required_if_not_public
def contents(name): def contents(name):
with g.lexicon.ctx.read('info') as info: # indexed = sort_by_index_spec(info, g.lexicon.cfg.article.index.list)
indexed = sort_by_index_spec(info, g.lexicon.cfg.article.index.list) # for articles in indexed.values():
for articles in indexed.values(): # for i in range(len(articles)):
for i in range(len(articles)): # articles[i] = {
articles[i] = { # 'title': articles[i],
'title': articles[i], # **info.get(articles[i])}
**info.get(articles[i])} return render_template("lexicon.contents.jinja")
return render_template('lexicon.contents.jinja', indexed=indexed)
@bp_lexicon.route('/article/<title>') @bp.get("/article/<title>")
@lexicon_param @lexicon_param
@player_required_if_not_public @player_required_if_not_public
def article(name, title): def article(name, title):
with g.lexicon.ctx.article.read(title) as a: # article = {**a, 'html': Markup(a['html'])}
article = {**a, 'html': Markup(a['html'])} return render_template("lexicon.article.jinja")
return render_template('lexicon.article.jinja', article=article)
@bp_lexicon.route('/rules/', methods=['GET']) @bp.get("/rules/")
@lexicon_param @lexicon_param
@player_required_if_not_public @player_required_if_not_public
def rules(name): def rules(name):
return render_template('lexicon.rules.jinja') return render_template("lexicon.rules.jinja")
@bp_lexicon.route('/statistics/', methods=['GET']) @bp.get("/statistics/")
@lexicon_param @lexicon_param
@player_required_if_not_public @player_required_if_not_public
def stats(name): def stats(name):
return render_template('lexicon.statistics.jinja') return render_template("lexicon.statistics.jinja")

View File

@ -3,6 +3,7 @@ from wtforms import StringField, SubmitField
class LexiconJoinForm(FlaskForm): class LexiconJoinForm(FlaskForm):
"""/lexicon/<name>/join/""" """/lexicon/<name>/join/"""
password = StringField('Password')
submit = SubmitField('Submit') password = StringField("Password")
submit = SubmitField("Submit")

View File

@ -14,14 +14,14 @@
{% block citations %} {% block citations %}
<p> <p>
{% for citation in article.cites %} {% for citation in article.cites %}
<a href="{{ citation|articlelink }}">{{ citation }}</a>{% if not loop.last %} / {% endif %} <a href="{{ citation|articlelink }}">{{ citation }}</a>{% if not loop.last %} / {% endif %}
{% endfor %} {% endfor %}
</p> </p>
<p> <p>
{% for citation in article.citedby %} {% for citation in article.citedby %}
<a href="{{ citation|articlelink }}">{{ citation }}</a>{% if not loop.last %} / {% endif %} <a href="{{ citation|articlelink }}">{{ citation }}</a>{% if not loop.last %} / {% endif %}
{% endfor %} {% endfor %}
</p> </p>
{% endblock %} {% endblock %}

View File

@ -14,7 +14,7 @@
<ul> <ul>
{% for article in indexed[index] %} {% for article in indexed[index] %}
<li><a href="{{ article.title|articlelink }}" class="{{ 'phantom' if not article.character else '' }}"> <li><a href="{{ article.title|articlelink }}" class="{{ 'phantom' if not article.character else '' }}">
{{ article.title }} {{ article.title }}
</a></li> </a></li>
{% endfor %} {% endfor %}
</ul> </ul>

View File

@ -1,14 +1,16 @@
{% extends "lexicon.jinja" %} {% extends "lexicon.jinja" %}
{% block title %}Join | {{ lexicon_title }}{% endblock %} {% block title %}Join | {{ g.lexicon.full_title }}{% endblock %}
{% block main %} {% block main %}
<form id="lexicon-join" action="" method="post" novalidate> <form id="lexicon-join" action="" method="post" novalidate>
{{ form.hidden_tag() }} {{ form.hidden_tag() }}
{% if g.lexicon.cfg.join.password %} {% if g.lexicon.join_password %}
<p>{{ form.password.label }}<br>{{ form.password(size=32) }}</p> <p>{{ form.password.label }}<br>{{ form.password(size=32) }}</p>
{% endif %} {% else %}
<p>{{ form.submit() }}</p> <p>Join {{ g.lexicon.full_title }}?</p>
{% endif %}
<p>{{ form.submit() }}</p>
</form> </form>
{% for message in get_flashed_messages() %} {% for message in get_flashed_messages() %}

View File

@ -3,7 +3,7 @@
<div class="dashboard-lexicon-item dashboard-lexicon-{{ status }}"> <div class="dashboard-lexicon-item dashboard-lexicon-{{ status }}">
<p> <p>
<span class="dashboard-lexicon-item-title"> <span class="dashboard-lexicon-item-title">
<a href="#{#{ url_for('lexicon.contents', name=lexicon.cfg.name) }#}">{{ lexicon.full_title }}</a> <a href="{{ url_for('lexicon.contents', name=lexicon.name) }}">{{ lexicon.full_title }}</a>
</span> </span>
[{{ status.capitalize() }}] [{{ status.capitalize() }}]
</p> </p>
@ -29,7 +29,7 @@
Players: {{ lexicon.memberships|count }}{% if lexicon.player_limit is not none %} / {{ lexicon.player_limit }}{% endif -%} Players: {{ lexicon.memberships|count }}{% if lexicon.player_limit is not none %} / {{ lexicon.player_limit }}{% endif -%}
{%- {%-
if lexicon.public and lexicon.joinable if lexicon.public and lexicon.joinable
%} / <a href="#{#{ url_for('lexicon.join', name=lexicon.cfg.name) }#}">Join game</a> %} / <a href="{{ url_for('lexicon.join', name=lexicon.name) }}">Join game</a>
{%- endif -%} {%- endif -%}
{%- endif -%} {%- endif -%}
</p> </p>

View File

@ -1,245 +1,240 @@
from flask import ( from flask import (
Blueprint, Blueprint,
render_template, )
url_for,
redirect,
g,
flash,
request,
Markup)
from flask_login import current_user
from amanuensis.lexicon import ( # from flask_login import current_user
attempt_publish,
get_player_characters,
create_character_in_lexicon,
get_draft)
from amanuensis.models import LexiconModel
from amanuensis.parser import parse_raw_markdown
from amanuensis.server.helpers import (
lexicon_param,
player_required,
editor_required)
from .forms import ( # from amanuensis.lexicon import (
LexiconCharacterForm, # attempt_publish,
LexiconReviewForm, # get_player_characters,
LexiconPublishTurnForm, # create_character_in_lexicon,
LexiconConfigForm) # get_draft)
# from amanuensis.models import LexiconModel
# from amanuensis.parser import parse_raw_markdown
# from amanuensis.server.helpers import (
# lexicon_param,
# player_required,
# editor_required)
from .editor import load_editor, new_draft, update_draft, PreviewHtmlRenderer # from .forms import (
# LexiconCharacterForm,
# LexiconReviewForm,
# LexiconPublishTurnForm,
# LexiconConfigForm)
# from .editor import load_editor, new_draft, update_draft, PreviewHtmlRenderer
bp_session = Blueprint('session', __name__, bp = Blueprint(
url_prefix='/lexicon/<name>/session', "session", __name__, url_prefix="/lexicon/<name>/session", template_folder="."
template_folder='.') )
@bp_session.route('/', methods=['GET', 'POST']) # @bp_session.route('/', methods=['GET', 'POST'])
@lexicon_param # @lexicon_param
@player_required # @player_required
def session(name): # def session(name):
drafts = [] # drafts = []
approved = [] # approved = []
draft_ctx = g.lexicon.ctx.draft # draft_ctx = g.lexicon.ctx.draft
draft_filenames = draft_ctx.ls() # draft_filenames = draft_ctx.ls()
for draft_filename in draft_filenames: # for draft_filename in draft_filenames:
with draft_ctx.read(draft_filename) as draft: # with draft_ctx.read(draft_filename) as draft:
if draft.status.ready and not draft.status.approved: # if draft.status.ready and not draft.status.approved:
drafts.append(draft) # drafts.append(draft)
if draft.status.approved: # if draft.status.approved:
approved.append(draft) # approved.append(draft)
characters = [] # characters = []
for char in g.lexicon.cfg.character.values(): # for char in g.lexicon.cfg.character.values():
if char.player == current_user.uid: # if char.player == current_user.uid:
characters.append(char) # characters.append(char)
form = LexiconPublishTurnForm() # form = LexiconPublishTurnForm()
if form.validate_on_submit(): # if form.validate_on_submit():
if attempt_publish(g.lexicon): # if attempt_publish(g.lexicon):
return redirect(url_for('lexicon.contents', name=name)) # return redirect(url_for('lexicon.contents', name=name))
else: # else:
flash('Publish failed') # flash('Publish failed')
return redirect(url_for('session.session', name=name)) # return redirect(url_for('session.session', name=name))
return render_template( # return render_template(
'session.root.jinja', # 'session.root.jinja',
ready_articles=drafts, # ready_articles=drafts,
approved_articles=approved, # approved_articles=approved,
characters=characters, # characters=characters,
publish_form=form) # publish_form=form)
def edit_character(name, form, character): # def edit_character(name, form, character):
if not form.is_submitted(): # if not form.is_submitted():
# GET, populate with values # # GET, populate with values
return render_template( # return render_template(
'session.character.jinja', form=form.for_character(character)) # 'session.character.jinja', form=form.for_character(character))
if not form.validate(): # if not form.validate():
# POST with invalid data, return unchanged # # POST with invalid data, return unchanged
return render_template('session.character.jinja', form=form) # return render_template('session.character.jinja', form=form)
# POST with valid data, update character # # POST with valid data, update character
with g.lexicon.ctx.edit_config() as cfg: # with g.lexicon.ctx.edit_config() as cfg:
char = cfg.character[character.cid] # char = cfg.character[character.cid]
char.name = form.characterName.data # char.name = form.characterName.data
char.signature = form.defaultSignature.data # char.signature = form.defaultSignature.data
flash('Character updated') # flash('Character updated')
return redirect(url_for('session.session', name=name)) # return redirect(url_for('session.session', name=name))
def create_character(name: str, form: LexiconCharacterForm): # def create_character(name: str, form: LexiconCharacterForm):
# Characters can't be created if the game has already started # # Characters can't be created if the game has already started
if g.lexicon.status != LexiconModel.PREGAME: # if g.lexicon.status != LexiconModel.PREGAME:
flash("Characters can't be added after the game has started") # flash("Characters can't be added after the game has started")
return redirect(url_for('session.session', name=name)) # return redirect(url_for('session.session', name=name))
# Characters can't be created beyond the per-player limit # # Characters can't be created beyond the per-player limit
player_characters = get_player_characters(g.lexicon, current_user.uid) # player_characters = get_player_characters(g.lexicon, current_user.uid)
if len(list(player_characters)) >= g.lexicon.cfg.join.chars_per_player: # if len(list(player_characters)) >= g.lexicon.cfg.join.chars_per_player:
flash("Can't create more characters") # flash("Can't create more characters")
return redirect(url_for('session.session', name=name)) # return redirect(url_for('session.session', name=name))
if not form.is_submitted(): # if not form.is_submitted():
# GET, populate with default values # # GET, populate with default values
return render_template( # return render_template(
'session.character.jinja', form=form.for_new()) # 'session.character.jinja', form=form.for_new())
if not form.validate(): # if not form.validate():
# POST with invalid data, return unchanged # # POST with invalid data, return unchanged
return render_template('session.character.jinja', form=form) # return render_template('session.character.jinja', form=form)
# POST with valid data, create character # # POST with valid data, create character
char_name = form.characterName.data # char_name = form.characterName.data
cid = create_character_in_lexicon(current_user, g.lexicon, char_name) # cid = create_character_in_lexicon(current_user, g.lexicon, char_name)
with g.lexicon.ctx.edit_config() as cfg: # with g.lexicon.ctx.edit_config() as cfg:
cfg.character[cid].signature = form.defaultSignature.data # cfg.character[cid].signature = form.defaultSignature.data
flash('Character created') # flash('Character created')
return redirect(url_for('session.session', name=name)) # return redirect(url_for('session.session', name=name))
@bp_session.route('/character/', methods=['GET', 'POST']) # @bp_session.route('/character/', methods=['GET', 'POST'])
@lexicon_param # @lexicon_param
@player_required # @player_required
def character(name): # def character(name):
form = LexiconCharacterForm() # form = LexiconCharacterForm()
cid = request.args.get('cid') # cid = request.args.get('cid')
if not cid: # if not cid:
# No character specified, creating a new character # # No character specified, creating a new character
return create_character(name, form) # return create_character(name, form)
character = g.lexicon.cfg.character.get(cid) # character = g.lexicon.cfg.character.get(cid)
if not character: # if not character:
# Bad character id, abort # # Bad character id, abort
flash('Character not found') # flash('Character not found')
return redirect(url_for('session.session', name=name)) # return redirect(url_for('session.session', name=name))
if current_user.uid not in (character.player, g.lexicon.cfg.editor): # if current_user.uid not in (character.player, g.lexicon.cfg.editor):
# Only its owner and the editor can edit a character # # Only its owner and the editor can edit a character
flash('Access denied') # flash('Access denied')
return redirect(url_for('session.session', name=name)) # return redirect(url_for('session.session', name=name))
# Edit allowed # # Edit allowed
return edit_character(name, form, character) # return edit_character(name, form, character)
@bp_session.route('/settings/', methods=['GET', 'POST']) # @bp_session.route('/settings/', methods=['GET', 'POST'])
@lexicon_param # @lexicon_param
@editor_required # @editor_required
def settings(name): # def settings(name):
form: LexiconConfigForm = LexiconConfigForm(g.lexicon) # form: LexiconConfigForm = LexiconConfigForm(g.lexicon)
if not form.is_submitted(): # if not form.is_submitted():
# GET # # GET
form.load(g.lexicon) # form.load(g.lexicon)
return render_template('session.settings.jinja', form=form) # return render_template('session.settings.jinja', form=form)
if not form.validate(): # if not form.validate():
# POST with invalid data # # POST with invalid data
flash('Validation error') # flash('Validation error')
return render_template('session.settings.jinja', form=form) # return render_template('session.settings.jinja', form=form)
# POST with valid data # # POST with valid data
form.save(g.lexicon) # form.save(g.lexicon)
flash('Settings updated') # flash('Settings updated')
return redirect(url_for('session.session', name=name)) # return redirect(url_for('session.session', name=name))
@bp_session.route('/review/', methods=['GET', 'POST']) # @bp_session.route('/review/', methods=['GET', 'POST'])
@lexicon_param # @lexicon_param
@editor_required # @editor_required
def review(name): # def review(name):
# Ensure the article exists # # Ensure the article exists
draft = get_draft(g.lexicon, request.args.get('aid')) # draft = get_draft(g.lexicon, request.args.get('aid'))
if not draft: # if not draft:
flash("Unknown article id") # flash("Unknown article id")
return redirect(url_for('session.session', name=name)) # return redirect(url_for('session.session', name=name))
draft_filename = f'{draft.character}.{draft.aid}' # draft_filename = f'{draft.character}.{draft.aid}'
with g.lexicon.ctx.draft.edit(draft_filename) as draft: # with g.lexicon.ctx.draft.edit(draft_filename) as draft:
# If the article was unreadied in the meantime, abort # # If the article was unreadied in the meantime, abort
if not draft.status.ready: # if not draft.status.ready:
flash("Article was rescinded") # flash("Article was rescinded")
return redirect(url_for('session.session', name=name)) # return redirect(url_for('session.session', name=name))
parsed_draft = parse_raw_markdown(draft.contents) # parsed_draft = parse_raw_markdown(draft.contents)
preview = parsed_draft.render(PreviewHtmlRenderer(g.lexicon)) # preview = parsed_draft.render(PreviewHtmlRenderer(g.lexicon))
rendered_html = preview.contents # rendered_html = preview.contents
citations = preview.citations # citations = preview.citations
# If the article was already reviewed, just preview it # # If the article was already reviewed, just preview it
if draft.status.approved: # if draft.status.approved:
return render_template( # return render_template(
"session.review.jinja", # "session.review.jinja",
article_html=Markup(rendered_html), # article_html=Markup(rendered_html),
citations=citations) # citations=citations)
# Otherwise, prepare the review form # # Otherwise, prepare the review form
form = LexiconReviewForm() # form = LexiconReviewForm()
if not form.validate_on_submit(): # if not form.validate_on_submit():
# GET or POST with invalid data # # GET or POST with invalid data
return render_template( # return render_template(
"session.review.jinja", # "session.review.jinja",
form=form, # form=form,
article_html=Markup(rendered_html), # article_html=Markup(rendered_html),
citations=citations) # citations=citations)
# POST with valid data # # POST with valid data
if form.approved.data == LexiconReviewForm.REJECTED: # if form.approved.data == LexiconReviewForm.REJECTED:
draft.status.ready = False # draft.status.ready = False
draft.status.approved = False # draft.status.approved = False
g.lexicon.log(f"Article '{draft.title}' rejected ({draft.aid})") # g.lexicon.log(f"Article '{draft.title}' rejected ({draft.aid})")
return redirect(url_for('session.session', name=name)) # return redirect(url_for('session.session', name=name))
else: # else:
draft.status.ready = True # draft.status.ready = True
draft.status.approved = True # draft.status.approved = True
g.lexicon.log(f"Article '{draft.title}' approved ({draft.aid})") # g.lexicon.log(f"Article '{draft.title}' approved ({draft.aid})")
# Draft was approved, check for asap publishing # # Draft was approved, check for asap publishing
if g.lexicon.cfg.publish.asap: # if g.lexicon.cfg.publish.asap:
if attempt_publish(g.lexicon): # if attempt_publish(g.lexicon):
redirect(url_for('lexicon.contents', name=name)) # redirect(url_for('lexicon.contents', name=name))
return redirect(url_for('session.session', name=name)) # return redirect(url_for('session.session', name=name))
@bp_session.route('/editor/', methods=['GET']) # @bp_session.route('/editor/', methods=['GET'])
@lexicon_param # @lexicon_param
@player_required # @player_required
def editor(name): # def editor(name):
lexicon: LexiconModel = g.lexicon # lexicon: LexiconModel = g.lexicon
aid: str = request.args.get('aid') # aid: str = request.args.get('aid')
return load_editor(lexicon, aid) # return load_editor(lexicon, aid)
@bp_session.route('/editor/new', methods=['GET']) # @bp_session.route('/editor/new', methods=['GET'])
@lexicon_param # @lexicon_param
@player_required # @player_required
def editor_new(name): # def editor_new(name):
lexicon: LexiconModel = g.lexicon # lexicon: LexiconModel = g.lexicon
cid: str = request.args.get('cid') # cid: str = request.args.get('cid')
return new_draft(lexicon, cid) # return new_draft(lexicon, cid)
@bp_session.route('/editor/update', methods=['POST']) # @bp_session.route('/editor/update', methods=['POST'])
@lexicon_param # @lexicon_param
@player_required # @player_required
def editor_update(name): # def editor_update(name):
lexicon: LexiconModel = g.lexicon # lexicon: LexiconModel = g.lexicon
article_json = request.json['article'] # article_json = request.json['article']
return update_draft(lexicon, article_json) # return update_draft(lexicon, article_json)

View File

@ -1,4 +1,4 @@
[mypy] [mypy]
ignore_missing_imports = true ignore_missing_imports = true
exclude = "|amanuensis/lexicon/.*|amanuensis/server/.*|amanuensis/server/lexicon/.*|amanuensis/server/session/.*|" exclude = "|amanuensis/lexicon/.*|amanuensis/server/.*|amanuensis/server/session/.*|"
; mypy stable doesn't support pyproject.toml yet ; mypy stable doesn't support pyproject.toml yet

View File

@ -22,11 +22,11 @@ amanuensis-cli = "amanuensis.cli:main"
amanuensis-server = "amanuensis.server:run" amanuensis-server = "amanuensis.server:run"
[tool.black] [tool.black]
extend-exclude = "^/amanuensis/lexicon/.*|^/amanuensis/server/[^/]*py|^/amanuensis/server/lexicon/.*|^/amanuensis/server/session/.*|" extend-exclude = "^/amanuensis/lexicon/.*|^/amanuensis/server/[^/]*py|^/amanuensis/server/session/.*|"
[tool.mypy] [tool.mypy]
ignore_missing_imports = true ignore_missing_imports = true
exclude = "|amanuensis/lexicon/.*|amanuensis/server/.*|amanuensis/server/lexicon/.*|amanuensis/server/session/.*|" exclude = "|amanuensis/lexicon/.*|amanuensis/server/.*|amanuensis/server/session/.*|"
[tool.pytest.ini_options] [tool.pytest.ini_options]
addopts = "--show-capture=log" addopts = "--show-capture=log"

View File

@ -9,5 +9,5 @@ def test_app_testing(app: Flask):
def test_client(app: Flask): def test_client(app: Flask):
"""Test that the test client works.""" """Test that the test client works."""
with app.test_client() as client: with app.test_client() as client:
response = client.get("/") response = client.get("/home/")
assert b"world" in response.data assert b"Amanuensis" in response.data