Compare commits
No commits in common. "8a3893f33d294d68905a08e512f3d4ab67d00a78" and "fbf9b59456ca3ef15d845d3ae1e9665387e73f3d" have entirely different histories.
8a3893f33d
...
fbf9b59456
|
@ -1,49 +0,0 @@
|
||||||
"""
|
|
||||||
User query interface
|
|
||||||
"""
|
|
||||||
|
|
||||||
import re
|
|
||||||
import uuid
|
|
||||||
|
|
||||||
from amanuensis.db import DbContext, User
|
|
||||||
from amanuensis.errors import ArgumentError
|
|
||||||
|
|
||||||
|
|
||||||
def create_user(
|
|
||||||
db: DbContext,
|
|
||||||
username: str,
|
|
||||||
password: str,
|
|
||||||
display_name: str,
|
|
||||||
email: str,
|
|
||||||
is_site_admin: bool) -> User:
|
|
||||||
"""
|
|
||||||
Create a new user.
|
|
||||||
"""
|
|
||||||
# Verify username
|
|
||||||
if len(username) < 3 or len(username) > 32:
|
|
||||||
raise ArgumentError('Username must be between 3 and 32 characters')
|
|
||||||
if re.match(r'^[0-9-_]*$', username):
|
|
||||||
raise ArgumentError('Username must contain a letter')
|
|
||||||
if not re.match(r'^[A-Za-z0-9-_]*$', username):
|
|
||||||
raise ArgumentError('Username may only contain alphanumerics, dash, and underscore')
|
|
||||||
# Verify password
|
|
||||||
if not password:
|
|
||||||
raise ArgumentError('Password must be provided')
|
|
||||||
# If display name is not provided, use the username
|
|
||||||
if not display_name.strip():
|
|
||||||
display_name = username
|
|
||||||
|
|
||||||
# Query the db to make sure the username isn't taken
|
|
||||||
if db.session.query(User.username == username).count() > 0:
|
|
||||||
raise ArgumentError('Username is already taken')
|
|
||||||
|
|
||||||
new_user = User(
|
|
||||||
username=username,
|
|
||||||
password=password,
|
|
||||||
display_name=display_name,
|
|
||||||
email=email,
|
|
||||||
is_site_admin=is_site_admin,
|
|
||||||
)
|
|
||||||
db.session.add(new_user)
|
|
||||||
db.session.commit()
|
|
||||||
return new_user
|
|
|
@ -1,10 +1,32 @@
|
||||||
"""
|
from sqlalchemy import create_engine, MetaData, event, TypeDecorator, CHAR
|
||||||
Database connection setup
|
|
||||||
"""
|
|
||||||
from sqlalchemy import create_engine, MetaData, event
|
|
||||||
from sqlalchemy.ext.declarative import declarative_base
|
from sqlalchemy.ext.declarative import declarative_base
|
||||||
from sqlalchemy.orm import scoped_session
|
from sqlalchemy.orm import scoped_session
|
||||||
from sqlalchemy.orm import sessionmaker
|
from sqlalchemy.orm import sessionmaker
|
||||||
|
import sqlite3
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
|
||||||
|
class Uuid(TypeDecorator):
|
||||||
|
"""
|
||||||
|
A uuid backed by a char(32) field in sqlite.
|
||||||
|
"""
|
||||||
|
impl = CHAR(32)
|
||||||
|
|
||||||
|
def process_bind_param(self, value, dialect):
|
||||||
|
if value is None:
|
||||||
|
return value
|
||||||
|
elif not isinstance(value, uuid.UUID):
|
||||||
|
return f'{uuid.UUID(value).int:32x}'
|
||||||
|
else:
|
||||||
|
return f'{value.int:32x}'
|
||||||
|
|
||||||
|
def process_result_value(self, value, dialect):
|
||||||
|
if value is None:
|
||||||
|
return value
|
||||||
|
elif not isinstance(value, uuid.UUID):
|
||||||
|
return uuid.UUID(value)
|
||||||
|
else:
|
||||||
|
return value
|
||||||
|
|
||||||
|
|
||||||
# Define naming conventions for generated constraints
|
# Define naming conventions for generated constraints
|
|
@ -1,15 +0,0 @@
|
||||||
from .database import DbContext
|
|
||||||
from .models import (
|
|
||||||
User,
|
|
||||||
Lexicon,
|
|
||||||
Membership,
|
|
||||||
Character,
|
|
||||||
ArticleState,
|
|
||||||
Article,
|
|
||||||
IndexType,
|
|
||||||
ArticleIndex,
|
|
||||||
ArticleIndexRule,
|
|
||||||
ArticleContentRuleType,
|
|
||||||
ArticleContentRule,
|
|
||||||
Post,
|
|
||||||
)
|
|
|
@ -1,10 +1,32 @@
|
||||||
"""
|
|
||||||
Submodule of custom exception types
|
|
||||||
"""
|
|
||||||
|
|
||||||
class AmanuensisError(Exception):
|
class AmanuensisError(Exception):
|
||||||
"""Base class for exceptions in amanuensis"""
|
"""Base class for exceptions in amanuensis"""
|
||||||
|
|
||||||
|
|
||||||
|
class MissingConfigError(AmanuensisError):
|
||||||
|
"""A config file is missing that was expected to be present"""
|
||||||
|
def __init__(self, path):
|
||||||
|
super().__init__("A config file or directory was expected to "
|
||||||
|
f"exist, but could not be found: {path}")
|
||||||
|
|
||||||
|
|
||||||
|
class ConfigAlreadyExistsError(AmanuensisError):
|
||||||
|
"""Attempted to create a config, but it already exists"""
|
||||||
|
def __init__(self, path):
|
||||||
|
super().__init__("Attempted to create a config, but it already "
|
||||||
|
f"exists: {path}")
|
||||||
|
|
||||||
|
|
||||||
|
class MalformedConfigError(AmanuensisError):
|
||||||
|
"""A config file could not be read and parsed"""
|
||||||
|
|
||||||
|
|
||||||
|
class ReadOnlyError(AmanuensisError):
|
||||||
|
"""A config was edited in readonly mode"""
|
||||||
|
|
||||||
|
|
||||||
class ArgumentError(AmanuensisError):
|
class ArgumentError(AmanuensisError):
|
||||||
"""An internal call was made with invalid arguments"""
|
"""An internal call was made with invalid arguments"""
|
||||||
|
|
||||||
|
|
||||||
|
class IndexMismatchError(AmanuensisError):
|
||||||
|
"""An id was obtained from an index, but the object doesn't exist"""
|
||||||
|
|
|
@ -1,11 +1,7 @@
|
||||||
"""
|
|
||||||
Data model SQL definitions
|
|
||||||
"""
|
|
||||||
import enum
|
import enum
|
||||||
from sqlalchemy import (
|
from sqlalchemy import (
|
||||||
Boolean,
|
Boolean,
|
||||||
Column,
|
Column,
|
||||||
CHAR,
|
|
||||||
DateTime,
|
DateTime,
|
||||||
Enum,
|
Enum,
|
||||||
ForeignKey,
|
ForeignKey,
|
||||||
|
@ -14,35 +10,11 @@ from sqlalchemy import (
|
||||||
Table,
|
Table,
|
||||||
Text,
|
Text,
|
||||||
text,
|
text,
|
||||||
TypeDecorator,
|
|
||||||
)
|
)
|
||||||
from sqlalchemy.orm import relationship, backref
|
from sqlalchemy.orm import relationship, backref
|
||||||
import uuid
|
from uuid import uuid4
|
||||||
|
|
||||||
from .database import ModelBase
|
from .database import ModelBase, Uuid
|
||||||
|
|
||||||
|
|
||||||
class Uuid(TypeDecorator):
|
|
||||||
"""
|
|
||||||
A uuid backed by a char(32) field in sqlite.
|
|
||||||
"""
|
|
||||||
impl = CHAR(32)
|
|
||||||
|
|
||||||
def process_bind_param(self, value, dialect):
|
|
||||||
if value is None:
|
|
||||||
return value
|
|
||||||
elif not isinstance(value, uuid.UUID):
|
|
||||||
return f'{uuid.UUID(value).int:32x}'
|
|
||||||
else:
|
|
||||||
return f'{value.int:32x}'
|
|
||||||
|
|
||||||
def process_result_value(self, value, dialect):
|
|
||||||
if value is None:
|
|
||||||
return value
|
|
||||||
elif not isinstance(value, uuid.UUID):
|
|
||||||
return uuid.UUID(value)
|
|
||||||
else:
|
|
||||||
return value
|
|
||||||
|
|
||||||
|
|
||||||
class User(ModelBase):
|
class User(ModelBase):
|
||||||
|
@ -67,9 +39,6 @@ class User(ModelBase):
|
||||||
# Human-readable username as shown to other users
|
# Human-readable username as shown to other users
|
||||||
display_name = Column(String, nullable=False)
|
display_name = Column(String, nullable=False)
|
||||||
|
|
||||||
# The user's email address
|
|
||||||
email = Column(String, nullable=False)
|
|
||||||
|
|
||||||
# Whether the user can access site admin functions
|
# Whether the user can access site admin functions
|
||||||
is_site_admin = Column(Boolean, nullable=False, server_default=text('FALSE'))
|
is_site_admin = Column(Boolean, nullable=False, server_default=text('FALSE'))
|
||||||
|
|
||||||
|
@ -300,7 +269,7 @@ class Character(ModelBase):
|
||||||
id = Column(Integer, primary_key=True)
|
id = Column(Integer, primary_key=True)
|
||||||
|
|
||||||
# Public-facing character id
|
# Public-facing character id
|
||||||
public_id = Column(Uuid, nullable=False, unique=True, default=uuid.uuid4)
|
public_id = Column(Uuid, nullable=False, unique=True, default=uuid4)
|
||||||
|
|
||||||
# The lexicon to which this character belongs
|
# The lexicon to which this character belongs
|
||||||
lexicon_id = Column(Integer, ForeignKey('lexicon.id'), nullable=False)
|
lexicon_id = Column(Integer, ForeignKey('lexicon.id'), nullable=False)
|
||||||
|
@ -347,7 +316,7 @@ class Article(ModelBase):
|
||||||
id = Column(Integer, primary_key=True)
|
id = Column(Integer, primary_key=True)
|
||||||
|
|
||||||
# Public-facing article id
|
# Public-facing article id
|
||||||
public_id = Column(Uuid, nullable=False, unique=True, default=uuid.uuid4)
|
public_id = Column(Uuid, nullable=False, unique=True, default=uuid4)
|
||||||
|
|
||||||
# The lexicon to which this article belongs
|
# The lexicon to which this article belongs
|
||||||
lexicon_id = Column(Integer, ForeignKey('lexicon.id'), nullable=False)
|
lexicon_id = Column(Integer, ForeignKey('lexicon.id'), nullable=False)
|
|
@ -1,51 +1,25 @@
|
||||||
import pytest
|
import pytest
|
||||||
from sqlalchemy import func
|
from sqlalchemy import func
|
||||||
|
|
||||||
from amanuensis.db import *
|
from amanuensis.database import DbContext
|
||||||
import amanuensis.backend.user as userq
|
from amanuensis.models import *
|
||||||
from amanuensis.errors import ArgumentError
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def db():
|
def session():
|
||||||
db = DbContext('sqlite:///:memory:', debug=True)
|
db = DbContext('sqlite:///:memory:', debug=True)
|
||||||
db.create_all()
|
db.create_all()
|
||||||
return db
|
return db.session
|
||||||
|
|
||||||
|
|
||||||
def test_create(db):
|
def test_create(session):
|
||||||
"""Simple test that the database creates fine from scratch."""
|
"""Simple test that the database creates fine from scratch."""
|
||||||
assert db.session.query(func.count(User.id)).scalar() == 0
|
assert session.query(func.count(User.id)).scalar() == 0
|
||||||
assert db.session.query(func.count(Lexicon.id)).scalar() == 0
|
assert session.query(func.count(Lexicon.id)).scalar() == 0
|
||||||
assert db.session.query(func.count(Membership.id)).scalar() == 0
|
assert session.query(func.count(Membership.id)).scalar() == 0
|
||||||
assert db.session.query(func.count(Character.id)).scalar() == 0
|
assert session.query(func.count(Character.id)).scalar() == 0
|
||||||
assert db.session.query(func.count(Article.id)).scalar() == 0
|
assert session.query(func.count(Article.id)).scalar() == 0
|
||||||
assert db.session.query(func.count(ArticleIndex.id)).scalar() == 0
|
assert session.query(func.count(ArticleIndex.id)).scalar() == 0
|
||||||
assert db.session.query(func.count(ArticleIndexRule.id)).scalar() == 0
|
assert session.query(func.count(ArticleIndexRule.id)).scalar() == 0
|
||||||
assert db.session.query(func.count(ArticleContentRule.id)).scalar() == 0
|
assert session.query(func.count(ArticleContentRule.id)).scalar() == 0
|
||||||
assert db.session.query(func.count(Post.id)).scalar() == 0
|
assert session.query(func.count(Post.id)).scalar() == 0
|
||||||
|
|
||||||
|
|
||||||
def test_create_user(db):
|
|
||||||
"""New user creation"""
|
|
||||||
kwargs = {
|
|
||||||
'username': 'username',
|
|
||||||
'password': 'password',
|
|
||||||
'display_name': 'User Name',
|
|
||||||
'email': 'user@example.com',
|
|
||||||
'is_site_admin': False
|
|
||||||
}
|
|
||||||
|
|
||||||
with pytest.raises(ArgumentError):
|
|
||||||
userq.create_user(db, **{**kwargs, 'username': 'user name'})
|
|
||||||
|
|
||||||
with pytest.raises(ArgumentError):
|
|
||||||
userq.create_user(db, **{**kwargs, 'password': None})
|
|
||||||
|
|
||||||
new_user = userq.create_user(db, **kwargs)
|
|
||||||
assert new_user
|
|
||||||
assert new_user.id is not None
|
|
||||||
assert new_user.created is not None
|
|
||||||
|
|
||||||
with pytest.raises(ArgumentError):
|
|
||||||
duplicate = userq.create_user(db, **kwargs)
|
|
||||||
|
|
Loading…
Reference in New Issue