Compare commits

...

3 Commits

6 changed files with 178 additions and 3 deletions

View File

@ -0,0 +1,74 @@
"""
Index query interface
"""
import re
from typing import Optional
from amanuensis.db import DbContext, ArticleIndex, IndexType
from amanuensis.errors import ArgumentError
def create(
db: DbContext,
lexicon_id: int,
index_type: IndexType,
pattern: str,
logical_order: int,
display_order: int,
capacity: Optional[int],
) -> ArticleIndex:
"""
Create a new index in a lexicon.
"""
# Verify argument types are correct
if not isinstance(lexicon_id, int):
raise ArgumentError("lexicon_id")
if not isinstance(index_type, IndexType):
raise ArgumentError("index_type")
if not isinstance(pattern, str):
raise ArgumentError("pattern")
if not isinstance(logical_order, int):
raise ArgumentError("logical_order")
if not isinstance(display_order, int):
raise ArgumentError("display_order")
if capacity is not None and not isinstance(capacity, int):
raise ArgumentError("capacity")
# Verify the pattern is valid for the index type:
if index_type == IndexType.CHAR:
if len(pattern) < 1:
raise ArgumentError(
f"Pattern '{pattern}' too short for index type {index_type}"
)
elif index_type == IndexType.RANGE:
range_def = re.match(r"^(.)-(.)$", pattern)
if not range_def:
raise ArgumentError(f"Pattern '{pattern}' is not a valid range format")
start_char, end_char = range_def.group(1), range_def.group(2)
if start_char >= end_char:
raise ArgumentError(
f"Range start '{start_char}' is not before range end '{end_char}'"
)
elif index_type == IndexType.PREFIX:
if len(pattern) < 1:
raise ArgumentError(
f"Pattern '{pattern}' too short for index type {index_type}"
)
elif index_type == IndexType.ETC:
if len(pattern) < 1:
raise ArgumentError(
f"Pattern '{pattern}' too short for index type {index_type}"
)
new_index = ArticleIndex(
lexicon_id=lexicon_id,
index_type=index_type,
pattern=pattern,
logical_order=logical_order,
display_order=display_order,
capacity=capacity,
)
db.session.add(new_index)
db.session.commit()
return new_index

View File

@ -5,6 +5,7 @@ Membership query interface
from sqlalchemy import select, func from sqlalchemy import select, func
from amanuensis.db import DbContext, Membership from amanuensis.db import DbContext, Membership
from amanuensis.db.models import Lexicon
from amanuensis.errors import ArgumentError from amanuensis.errors import ArgumentError
@ -36,6 +37,23 @@ def create(
): ):
raise ArgumentError("User is already a member of lexicon") raise ArgumentError("User is already a member of lexicon")
# get reference to lexicon for next few checks
lex: Lexicon = db(
select(Lexicon).where(Lexicon.id == lexicon_id)
).scalar_one_or_none()
# Verify lexicon is joinable; current no Lexicons are joinable so this is commented out
if not lex.joinable:
raise ArgumentError("Can't join: Lexicon is not joinable")
# Verify lexicon is not full
if lex.player_limit:
if (
db(select(func.count()).where(Membership.lexicon_id == lexicon_id)).scalar()
>= lex.player_limit
):
raise ArgumentError("Can't join: Lexicon is full")
new_membership = Membership( new_membership = Membership(
user_id=user_id, user_id=user_id,
lexicon_id=lexicon_id, lexicon_id=lexicon_id,

View File

@ -4,9 +4,10 @@ Post query interface
import re import re
from sqlalchemy import select, func from sqlalchemy import select
from amanuensis.db import DbContext, Post from amanuensis.db import DbContext, Post
from amanuensis.db.models import Lexicon
from amanuensis.errors import ArgumentError from amanuensis.errors import ArgumentError
@ -34,6 +35,14 @@ def create(
if not body.strip(): if not body.strip():
raise ArgumentError("Post body cannot be empty.") raise ArgumentError("Post body cannot be empty.")
# Check that the lexicon allows posting
if not (
db(select(Lexicon).where(Lexicon.id == lexicon_id))
.scalar_one_or_none()
.allow_post
):
raise ArgumentError("Lexicon does not allow posting.")
new_post = Post(lexicon_id=lexicon_id, user_id=user_id, body=body) new_post = Post(lexicon_id=lexicon_id, user_id=user_id, body=body)
db.session.add(new_post) db.session.add(new_post)
db.session.commit() db.session.commit()

View File

@ -49,7 +49,10 @@ def make_lexicon(db: DbContext):
} }
state["nonce"] += 1 state["nonce"] += 1
updated_kwargs = {**default_kwargs, **kwargs} updated_kwargs = {**default_kwargs, **kwargs}
return lexiq.create(db, **updated_kwargs) lex = lexiq.create(db, **updated_kwargs)
lex.joinable = True
db.session.commit()
return lex
return lexicon_factory return lexicon_factory

50
tests/test_index.py Normal file
View File

@ -0,0 +1,50 @@
from amanuensis.db.models import IndexType
import pytest
import amanuensis.backend.index as indq
from amanuensis.db import DbContext, Lexicon, User
from amanuensis.errors import ArgumentError
def test_create_index(db: DbContext, make):
"""Test new index creation"""
lexicon: Lexicon = make.lexicon()
defaults: dict = {
"db": db,
"lexicon_id": lexicon.id,
"index_type": IndexType.ETC,
"pattern": "&c.",
"logical_order": 0,
"display_order": 0,
"capacity": 0,
}
kwargs: dict
# Character indexes require nonempty patterns
with pytest.raises(ArgumentError):
kwargs = {**defaults, "index_type": IndexType.CHAR, "pattern": ""}
indq.create(**kwargs)
kwargs = {**defaults, "index_type": IndexType.CHAR, "pattern": "ABC"}
assert indq.create(**kwargs)
# Range indexes must follow the 1-2 format
with pytest.raises(ArgumentError):
kwargs = {**defaults, "index_type": IndexType.RANGE, "pattern": "ABC"}
indq.create(**kwargs)
kwargs = {**defaults, "index_type": IndexType.RANGE, "pattern": "A-F"}
assert indq.create(**kwargs)
# Prefix indexes require nonempty patterns
with pytest.raises(ArgumentError):
kwargs = {**defaults, "index_type": IndexType.CHAR, "pattern": ""}
indq.create(**kwargs)
kwargs = {**defaults, "index_type": IndexType.CHAR, "pattern": "Prefix:"}
assert indq.create(**kwargs)
# Etc indexes require nonempty patterns
with pytest.raises(ArgumentError):
kwargs = {**defaults, "index_type": IndexType.CHAR, "pattern": ""}
indq.create(**kwargs)
kwargs = {**defaults, "index_type": IndexType.CHAR, "pattern": "&c."}
assert indq.create(**kwargs)

View File

@ -15,7 +15,15 @@ def test_create_membership(db: DbContext, make):
new_lexicon: Lexicon = make.lexicon() new_lexicon: Lexicon = make.lexicon()
assert new_lexicon.id, "Failed to create lexicon" assert new_lexicon.id, "Failed to create lexicon"
# Add the user to the lexicon as an editor # Joining doesn't work when joinable is false
new_lexicon.joinable = False
db.session.commit()
with pytest.raises(ArgumentError):
memq.create(db, new_user.id, new_lexicon.id, True)
# Joining works when joinable is true
new_lexicon.joinable = True
db.session.commit()
mem: Membership = memq.create(db, new_user.id, new_lexicon.id, True) mem: Membership = memq.create(db, new_user.id, new_lexicon.id, True)
assert mem, "Failed to create membership" assert mem, "Failed to create membership"
@ -38,3 +46,16 @@ def test_create_membership(db: DbContext, make):
# Check that joining twice is not allowed # Check that joining twice is not allowed
with pytest.raises(ArgumentError): with pytest.raises(ArgumentError):
memq.create(db, new_user.id, new_lexicon.id, False) memq.create(db, new_user.id, new_lexicon.id, False)
# Check that joining full lexicon not allowed
new_lexicon.player_limit = 1
db.session.commit()
two_user: User = make.user()
with pytest.raises(ArgumentError):
memq.create(db, two_user.id, new_lexicon.id, False)
new_lexicon.player_limit = 2
db.session.commit()
mem2: Membership = memq.create(db, two_user.id, new_lexicon.id, False)
assert mem2, "Failed to join lexicon with open player slots"