Compare commits
3 Commits
39ac079614
...
3555558bd9
Author | SHA1 | Date |
---|---|---|
Nikolai | 3555558bd9 | |
Nikolai | 5497ef4b0b | |
Tim Van Baak | 973d60008d |
|
@ -0,0 +1,74 @@
|
||||||
|
"""
|
||||||
|
Index query interface
|
||||||
|
"""
|
||||||
|
|
||||||
|
import re
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from amanuensis.db import DbContext, ArticleIndex, IndexType
|
||||||
|
from amanuensis.errors import ArgumentError
|
||||||
|
|
||||||
|
|
||||||
|
def create(
|
||||||
|
db: DbContext,
|
||||||
|
lexicon_id: int,
|
||||||
|
index_type: IndexType,
|
||||||
|
pattern: str,
|
||||||
|
logical_order: int,
|
||||||
|
display_order: int,
|
||||||
|
capacity: Optional[int],
|
||||||
|
) -> ArticleIndex:
|
||||||
|
"""
|
||||||
|
Create a new index in a lexicon.
|
||||||
|
"""
|
||||||
|
# Verify argument types are correct
|
||||||
|
if not isinstance(lexicon_id, int):
|
||||||
|
raise ArgumentError("lexicon_id")
|
||||||
|
if not isinstance(index_type, IndexType):
|
||||||
|
raise ArgumentError("index_type")
|
||||||
|
if not isinstance(pattern, str):
|
||||||
|
raise ArgumentError("pattern")
|
||||||
|
if not isinstance(logical_order, int):
|
||||||
|
raise ArgumentError("logical_order")
|
||||||
|
if not isinstance(display_order, int):
|
||||||
|
raise ArgumentError("display_order")
|
||||||
|
if capacity is not None and not isinstance(capacity, int):
|
||||||
|
raise ArgumentError("capacity")
|
||||||
|
|
||||||
|
# Verify the pattern is valid for the index type:
|
||||||
|
if index_type == IndexType.CHAR:
|
||||||
|
if len(pattern) < 1:
|
||||||
|
raise ArgumentError(
|
||||||
|
f"Pattern '{pattern}' too short for index type {index_type}"
|
||||||
|
)
|
||||||
|
elif index_type == IndexType.RANGE:
|
||||||
|
range_def = re.match(r"^(.)-(.)$", pattern)
|
||||||
|
if not range_def:
|
||||||
|
raise ArgumentError(f"Pattern '{pattern}' is not a valid range format")
|
||||||
|
start_char, end_char = range_def.group(1), range_def.group(2)
|
||||||
|
if start_char >= end_char:
|
||||||
|
raise ArgumentError(
|
||||||
|
f"Range start '{start_char}' is not before range end '{end_char}'"
|
||||||
|
)
|
||||||
|
elif index_type == IndexType.PREFIX:
|
||||||
|
if len(pattern) < 1:
|
||||||
|
raise ArgumentError(
|
||||||
|
f"Pattern '{pattern}' too short for index type {index_type}"
|
||||||
|
)
|
||||||
|
elif index_type == IndexType.ETC:
|
||||||
|
if len(pattern) < 1:
|
||||||
|
raise ArgumentError(
|
||||||
|
f"Pattern '{pattern}' too short for index type {index_type}"
|
||||||
|
)
|
||||||
|
|
||||||
|
new_index = ArticleIndex(
|
||||||
|
lexicon_id=lexicon_id,
|
||||||
|
index_type=index_type,
|
||||||
|
pattern=pattern,
|
||||||
|
logical_order=logical_order,
|
||||||
|
display_order=display_order,
|
||||||
|
capacity=capacity,
|
||||||
|
)
|
||||||
|
db.session.add(new_index)
|
||||||
|
db.session.commit()
|
||||||
|
return new_index
|
|
@ -5,6 +5,7 @@ Membership query interface
|
||||||
from sqlalchemy import select, func
|
from sqlalchemy import select, func
|
||||||
|
|
||||||
from amanuensis.db import DbContext, Membership
|
from amanuensis.db import DbContext, Membership
|
||||||
|
from amanuensis.db.models import Lexicon
|
||||||
from amanuensis.errors import ArgumentError
|
from amanuensis.errors import ArgumentError
|
||||||
|
|
||||||
|
|
||||||
|
@ -36,6 +37,23 @@ def create(
|
||||||
):
|
):
|
||||||
raise ArgumentError("User is already a member of lexicon")
|
raise ArgumentError("User is already a member of lexicon")
|
||||||
|
|
||||||
|
# get reference to lexicon for next few checks
|
||||||
|
lex: Lexicon = db(
|
||||||
|
select(Lexicon).where(Lexicon.id == lexicon_id)
|
||||||
|
).scalar_one_or_none()
|
||||||
|
|
||||||
|
# Verify lexicon is joinable; current no Lexicons are joinable so this is commented out
|
||||||
|
if not lex.joinable:
|
||||||
|
raise ArgumentError("Can't join: Lexicon is not joinable")
|
||||||
|
|
||||||
|
# Verify lexicon is not full
|
||||||
|
if lex.player_limit:
|
||||||
|
if (
|
||||||
|
db(select(func.count()).where(Membership.lexicon_id == lexicon_id)).scalar()
|
||||||
|
>= lex.player_limit
|
||||||
|
):
|
||||||
|
raise ArgumentError("Can't join: Lexicon is full")
|
||||||
|
|
||||||
new_membership = Membership(
|
new_membership = Membership(
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
lexicon_id=lexicon_id,
|
lexicon_id=lexicon_id,
|
||||||
|
|
|
@ -4,9 +4,10 @@ Post query interface
|
||||||
|
|
||||||
import re
|
import re
|
||||||
|
|
||||||
from sqlalchemy import select, func
|
from sqlalchemy import select
|
||||||
|
|
||||||
from amanuensis.db import DbContext, Post
|
from amanuensis.db import DbContext, Post
|
||||||
|
from amanuensis.db.models import Lexicon
|
||||||
from amanuensis.errors import ArgumentError
|
from amanuensis.errors import ArgumentError
|
||||||
|
|
||||||
|
|
||||||
|
@ -34,6 +35,14 @@ def create(
|
||||||
if not body.strip():
|
if not body.strip():
|
||||||
raise ArgumentError("Post body cannot be empty.")
|
raise ArgumentError("Post body cannot be empty.")
|
||||||
|
|
||||||
|
# Check that the lexicon allows posting
|
||||||
|
if not (
|
||||||
|
db(select(Lexicon).where(Lexicon.id == lexicon_id))
|
||||||
|
.scalar_one_or_none()
|
||||||
|
.allow_post
|
||||||
|
):
|
||||||
|
raise ArgumentError("Lexicon does not allow posting.")
|
||||||
|
|
||||||
new_post = Post(lexicon_id=lexicon_id, user_id=user_id, body=body)
|
new_post = Post(lexicon_id=lexicon_id, user_id=user_id, body=body)
|
||||||
db.session.add(new_post)
|
db.session.add(new_post)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
|
@ -49,7 +49,10 @@ def make_lexicon(db: DbContext):
|
||||||
}
|
}
|
||||||
state["nonce"] += 1
|
state["nonce"] += 1
|
||||||
updated_kwargs = {**default_kwargs, **kwargs}
|
updated_kwargs = {**default_kwargs, **kwargs}
|
||||||
return lexiq.create(db, **updated_kwargs)
|
lex = lexiq.create(db, **updated_kwargs)
|
||||||
|
lex.joinable = True
|
||||||
|
db.session.commit()
|
||||||
|
return lex
|
||||||
|
|
||||||
return lexicon_factory
|
return lexicon_factory
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,50 @@
|
||||||
|
from amanuensis.db.models import IndexType
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
import amanuensis.backend.index as indq
|
||||||
|
from amanuensis.db import DbContext, Lexicon, User
|
||||||
|
|
||||||
|
from amanuensis.errors import ArgumentError
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_index(db: DbContext, make):
|
||||||
|
"""Test new index creation"""
|
||||||
|
lexicon: Lexicon = make.lexicon()
|
||||||
|
defaults: dict = {
|
||||||
|
"db": db,
|
||||||
|
"lexicon_id": lexicon.id,
|
||||||
|
"index_type": IndexType.ETC,
|
||||||
|
"pattern": "&c.",
|
||||||
|
"logical_order": 0,
|
||||||
|
"display_order": 0,
|
||||||
|
"capacity": 0,
|
||||||
|
}
|
||||||
|
kwargs: dict
|
||||||
|
|
||||||
|
# Character indexes require nonempty patterns
|
||||||
|
with pytest.raises(ArgumentError):
|
||||||
|
kwargs = {**defaults, "index_type": IndexType.CHAR, "pattern": ""}
|
||||||
|
indq.create(**kwargs)
|
||||||
|
kwargs = {**defaults, "index_type": IndexType.CHAR, "pattern": "ABC"}
|
||||||
|
assert indq.create(**kwargs)
|
||||||
|
|
||||||
|
# Range indexes must follow the 1-2 format
|
||||||
|
with pytest.raises(ArgumentError):
|
||||||
|
kwargs = {**defaults, "index_type": IndexType.RANGE, "pattern": "ABC"}
|
||||||
|
indq.create(**kwargs)
|
||||||
|
kwargs = {**defaults, "index_type": IndexType.RANGE, "pattern": "A-F"}
|
||||||
|
assert indq.create(**kwargs)
|
||||||
|
|
||||||
|
# Prefix indexes require nonempty patterns
|
||||||
|
with pytest.raises(ArgumentError):
|
||||||
|
kwargs = {**defaults, "index_type": IndexType.CHAR, "pattern": ""}
|
||||||
|
indq.create(**kwargs)
|
||||||
|
kwargs = {**defaults, "index_type": IndexType.CHAR, "pattern": "Prefix:"}
|
||||||
|
assert indq.create(**kwargs)
|
||||||
|
|
||||||
|
# Etc indexes require nonempty patterns
|
||||||
|
with pytest.raises(ArgumentError):
|
||||||
|
kwargs = {**defaults, "index_type": IndexType.CHAR, "pattern": ""}
|
||||||
|
indq.create(**kwargs)
|
||||||
|
kwargs = {**defaults, "index_type": IndexType.CHAR, "pattern": "&c."}
|
||||||
|
assert indq.create(**kwargs)
|
|
@ -15,7 +15,15 @@ def test_create_membership(db: DbContext, make):
|
||||||
new_lexicon: Lexicon = make.lexicon()
|
new_lexicon: Lexicon = make.lexicon()
|
||||||
assert new_lexicon.id, "Failed to create lexicon"
|
assert new_lexicon.id, "Failed to create lexicon"
|
||||||
|
|
||||||
# Add the user to the lexicon as an editor
|
# Joining doesn't work when joinable is false
|
||||||
|
new_lexicon.joinable = False
|
||||||
|
db.session.commit()
|
||||||
|
with pytest.raises(ArgumentError):
|
||||||
|
memq.create(db, new_user.id, new_lexicon.id, True)
|
||||||
|
|
||||||
|
# Joining works when joinable is true
|
||||||
|
new_lexicon.joinable = True
|
||||||
|
db.session.commit()
|
||||||
mem: Membership = memq.create(db, new_user.id, new_lexicon.id, True)
|
mem: Membership = memq.create(db, new_user.id, new_lexicon.id, True)
|
||||||
assert mem, "Failed to create membership"
|
assert mem, "Failed to create membership"
|
||||||
|
|
||||||
|
@ -38,3 +46,16 @@ def test_create_membership(db: DbContext, make):
|
||||||
# Check that joining twice is not allowed
|
# Check that joining twice is not allowed
|
||||||
with pytest.raises(ArgumentError):
|
with pytest.raises(ArgumentError):
|
||||||
memq.create(db, new_user.id, new_lexicon.id, False)
|
memq.create(db, new_user.id, new_lexicon.id, False)
|
||||||
|
|
||||||
|
# Check that joining full lexicon not allowed
|
||||||
|
new_lexicon.player_limit = 1
|
||||||
|
db.session.commit()
|
||||||
|
two_user: User = make.user()
|
||||||
|
|
||||||
|
with pytest.raises(ArgumentError):
|
||||||
|
memq.create(db, two_user.id, new_lexicon.id, False)
|
||||||
|
|
||||||
|
new_lexicon.player_limit = 2
|
||||||
|
db.session.commit()
|
||||||
|
mem2: Membership = memq.create(db, two_user.id, new_lexicon.id, False)
|
||||||
|
assert mem2, "Failed to join lexicon with open player slots"
|
||||||
|
|
Loading…
Reference in New Issue