Add check for duplicate memberships
This commit is contained in:
parent
a21092b7e0
commit
5e051e7e89
@ -2,6 +2,8 @@
|
||||
Membership query interface
|
||||
"""
|
||||
|
||||
from sqlalchemy import select, func
|
||||
|
||||
from amanuensis.db import DbContext, Membership
|
||||
from amanuensis.errors import ArgumentError
|
||||
|
||||
@ -14,7 +16,7 @@ def create(
|
||||
"""
|
||||
Create a new user membership in a lexicon.
|
||||
"""
|
||||
# Quick argument verification
|
||||
# Verify argument types are correct
|
||||
if not isinstance(user_id, int):
|
||||
raise ArgumentError('user_id')
|
||||
if not isinstance(lexicon_id, int):
|
||||
@ -22,6 +24,14 @@ def create(
|
||||
if not isinstance(is_editor, bool):
|
||||
raise ArgumentError('is_editor')
|
||||
|
||||
# Verify user has not already joined lexicon
|
||||
if db(
|
||||
select(func.count(Membership.id))
|
||||
.where(Membership.user_id == user_id)
|
||||
.where(Membership.lexicon_id == lexicon_id)
|
||||
).scalar() > 0:
|
||||
raise ArgumentError('User is already a member of lexicon')
|
||||
|
||||
new_membership = Membership(
|
||||
user_id=user_id,
|
||||
lexicon_id=lexicon_id,
|
||||
|
@ -2,6 +2,8 @@
|
||||
Data model SQL definitions
|
||||
"""
|
||||
import enum
|
||||
import uuid
|
||||
|
||||
from sqlalchemy import (
|
||||
Boolean,
|
||||
Column,
|
||||
@ -17,7 +19,7 @@ from sqlalchemy import (
|
||||
TypeDecorator,
|
||||
)
|
||||
from sqlalchemy.orm import relationship, backref
|
||||
import uuid
|
||||
from sqlalchemy.sql.schema import UniqueConstraint
|
||||
|
||||
from .database import ModelBase
|
||||
|
||||
@ -233,6 +235,9 @@ class Membership(ModelBase):
|
||||
Represents a user's participation in a Lexicon game.
|
||||
"""
|
||||
__tablename__ = 'membership'
|
||||
__table_args__ = (
|
||||
UniqueConstraint('user_id', 'lexicon_id'),
|
||||
)
|
||||
|
||||
###################
|
||||
# Membership keys #
|
||||
|
@ -1,4 +1,9 @@
|
||||
from amanuensis.db import DbContext
|
||||
import pytest
|
||||
|
||||
from sqlalchemy import select
|
||||
|
||||
from amanuensis.db import *
|
||||
from amanuensis.errors import ArgumentError
|
||||
import amanuensis.backend.membership as memq
|
||||
|
||||
|
||||
@ -15,10 +20,22 @@ def test_create_membership(db: DbContext, make_user, make_lexicon):
|
||||
assert mem, 'Failed to create membership'
|
||||
|
||||
# Check that the user and lexicon are mutually visible in the ORM relationships
|
||||
assert new_user.memberships, 'User memberships not updated'
|
||||
assert new_lexicon.memberships, 'Lexicon memberships not updated'
|
||||
assert new_user.memberships[0].lexicon_id == new_lexicon.id
|
||||
assert new_lexicon.memberships[0].user_id == new_user.id
|
||||
assert any(map(lambda mem: mem.lexicon == new_lexicon, new_user.memberships))
|
||||
assert any(map(lambda mem: mem.user == new_user, new_lexicon.memberships))
|
||||
|
||||
# Check that the editor flag was set properly
|
||||
assert new_lexicon.memberships
|
||||
editor = db(
|
||||
select(User)
|
||||
.join(User.memberships)
|
||||
.join(Membership.lexicon)
|
||||
.where(Lexicon.id == new_lexicon.id)
|
||||
.where(Membership.is_editor == True)
|
||||
).scalar_one()
|
||||
assert editor is not None
|
||||
assert isinstance(editor, User)
|
||||
assert editor.id == new_user.id
|
||||
|
||||
# Check that joining twice is not allowed
|
||||
with pytest.raises(ArgumentError):
|
||||
mem2 = memq.create(db, new_user.id, new_lexicon.id, False)
|
||||
assert mem2
|
||||
|
Loading…
Reference in New Issue
Block a user