Compare commits

..

No commits in common. "8a3893f33d294d68905a08e512f3d4ab67d00a78" and "fbf9b59456ca3ef15d845d3ae1e9665387e73f3d" have entirely different histories.

7 changed files with 72 additions and 149 deletions

View File

@ -1,49 +0,0 @@
"""
User query interface
"""
import re
import uuid
from amanuensis.db import DbContext, User
from amanuensis.errors import ArgumentError
def create_user(
db: DbContext,
username: str,
password: str,
display_name: str,
email: str,
is_site_admin: bool) -> User:
"""
Create a new user.
"""
# Verify username
if len(username) < 3 or len(username) > 32:
raise ArgumentError('Username must be between 3 and 32 characters')
if re.match(r'^[0-9-_]*$', username):
raise ArgumentError('Username must contain a letter')
if not re.match(r'^[A-Za-z0-9-_]*$', username):
raise ArgumentError('Username may only contain alphanumerics, dash, and underscore')
# Verify password
if not password:
raise ArgumentError('Password must be provided')
# If display name is not provided, use the username
if not display_name.strip():
display_name = username
# Query the db to make sure the username isn't taken
if db.session.query(User.username == username).count() > 0:
raise ArgumentError('Username is already taken')
new_user = User(
username=username,
password=password,
display_name=display_name,
email=email,
is_site_admin=is_site_admin,
)
db.session.add(new_user)
db.session.commit()
return new_user

View File

@ -1,10 +1,32 @@
""" from sqlalchemy import create_engine, MetaData, event, TypeDecorator, CHAR
Database connection setup
"""
from sqlalchemy import create_engine, MetaData, event
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
import sqlite3
import uuid
class Uuid(TypeDecorator):
"""
A uuid backed by a char(32) field in sqlite.
"""
impl = CHAR(32)
def process_bind_param(self, value, dialect):
if value is None:
return value
elif not isinstance(value, uuid.UUID):
return f'{uuid.UUID(value).int:32x}'
else:
return f'{value.int:32x}'
def process_result_value(self, value, dialect):
if value is None:
return value
elif not isinstance(value, uuid.UUID):
return uuid.UUID(value)
else:
return value
# Define naming conventions for generated constraints # Define naming conventions for generated constraints

View File

@ -1,15 +0,0 @@
from .database import DbContext
from .models import (
User,
Lexicon,
Membership,
Character,
ArticleState,
Article,
IndexType,
ArticleIndex,
ArticleIndexRule,
ArticleContentRuleType,
ArticleContentRule,
Post,
)

View File

@ -1,10 +1,32 @@
"""
Submodule of custom exception types
"""
class AmanuensisError(Exception): class AmanuensisError(Exception):
"""Base class for exceptions in amanuensis""" """Base class for exceptions in amanuensis"""
class MissingConfigError(AmanuensisError):
"""A config file is missing that was expected to be present"""
def __init__(self, path):
super().__init__("A config file or directory was expected to "
f"exist, but could not be found: {path}")
class ConfigAlreadyExistsError(AmanuensisError):
"""Attempted to create a config, but it already exists"""
def __init__(self, path):
super().__init__("Attempted to create a config, but it already "
f"exists: {path}")
class MalformedConfigError(AmanuensisError):
"""A config file could not be read and parsed"""
class ReadOnlyError(AmanuensisError):
"""A config was edited in readonly mode"""
class ArgumentError(AmanuensisError): class ArgumentError(AmanuensisError):
"""An internal call was made with invalid arguments""" """An internal call was made with invalid arguments"""
class IndexMismatchError(AmanuensisError):
"""An id was obtained from an index, but the object doesn't exist"""

View File

@ -1,11 +1,7 @@
"""
Data model SQL definitions
"""
import enum import enum
from sqlalchemy import ( from sqlalchemy import (
Boolean, Boolean,
Column, Column,
CHAR,
DateTime, DateTime,
Enum, Enum,
ForeignKey, ForeignKey,
@ -14,35 +10,11 @@ from sqlalchemy import (
Table, Table,
Text, Text,
text, text,
TypeDecorator,
) )
from sqlalchemy.orm import relationship, backref from sqlalchemy.orm import relationship, backref
import uuid from uuid import uuid4
from .database import ModelBase from .database import ModelBase, Uuid
class Uuid(TypeDecorator):
"""
A uuid backed by a char(32) field in sqlite.
"""
impl = CHAR(32)
def process_bind_param(self, value, dialect):
if value is None:
return value
elif not isinstance(value, uuid.UUID):
return f'{uuid.UUID(value).int:32x}'
else:
return f'{value.int:32x}'
def process_result_value(self, value, dialect):
if value is None:
return value
elif not isinstance(value, uuid.UUID):
return uuid.UUID(value)
else:
return value
class User(ModelBase): class User(ModelBase):
@ -67,9 +39,6 @@ class User(ModelBase):
# Human-readable username as shown to other users # Human-readable username as shown to other users
display_name = Column(String, nullable=False) display_name = Column(String, nullable=False)
# The user's email address
email = Column(String, nullable=False)
# Whether the user can access site admin functions # Whether the user can access site admin functions
is_site_admin = Column(Boolean, nullable=False, server_default=text('FALSE')) is_site_admin = Column(Boolean, nullable=False, server_default=text('FALSE'))
@ -300,7 +269,7 @@ class Character(ModelBase):
id = Column(Integer, primary_key=True) id = Column(Integer, primary_key=True)
# Public-facing character id # Public-facing character id
public_id = Column(Uuid, nullable=False, unique=True, default=uuid.uuid4) public_id = Column(Uuid, nullable=False, unique=True, default=uuid4)
# The lexicon to which this character belongs # The lexicon to which this character belongs
lexicon_id = Column(Integer, ForeignKey('lexicon.id'), nullable=False) lexicon_id = Column(Integer, ForeignKey('lexicon.id'), nullable=False)
@ -347,7 +316,7 @@ class Article(ModelBase):
id = Column(Integer, primary_key=True) id = Column(Integer, primary_key=True)
# Public-facing article id # Public-facing article id
public_id = Column(Uuid, nullable=False, unique=True, default=uuid.uuid4) public_id = Column(Uuid, nullable=False, unique=True, default=uuid4)
# The lexicon to which this article belongs # The lexicon to which this article belongs
lexicon_id = Column(Integer, ForeignKey('lexicon.id'), nullable=False) lexicon_id = Column(Integer, ForeignKey('lexicon.id'), nullable=False)

View File

@ -1,51 +1,25 @@
import pytest import pytest
from sqlalchemy import func from sqlalchemy import func
from amanuensis.db import * from amanuensis.database import DbContext
import amanuensis.backend.user as userq from amanuensis.models import *
from amanuensis.errors import ArgumentError
@pytest.fixture @pytest.fixture
def db(): def session():
db = DbContext('sqlite:///:memory:', debug=True) db = DbContext('sqlite:///:memory:', debug=True)
db.create_all() db.create_all()
return db return db.session
def test_create(db): def test_create(session):
"""Simple test that the database creates fine from scratch.""" """Simple test that the database creates fine from scratch."""
assert db.session.query(func.count(User.id)).scalar() == 0 assert session.query(func.count(User.id)).scalar() == 0
assert db.session.query(func.count(Lexicon.id)).scalar() == 0 assert session.query(func.count(Lexicon.id)).scalar() == 0
assert db.session.query(func.count(Membership.id)).scalar() == 0 assert session.query(func.count(Membership.id)).scalar() == 0
assert db.session.query(func.count(Character.id)).scalar() == 0 assert session.query(func.count(Character.id)).scalar() == 0
assert db.session.query(func.count(Article.id)).scalar() == 0 assert session.query(func.count(Article.id)).scalar() == 0
assert db.session.query(func.count(ArticleIndex.id)).scalar() == 0 assert session.query(func.count(ArticleIndex.id)).scalar() == 0
assert db.session.query(func.count(ArticleIndexRule.id)).scalar() == 0 assert session.query(func.count(ArticleIndexRule.id)).scalar() == 0
assert db.session.query(func.count(ArticleContentRule.id)).scalar() == 0 assert session.query(func.count(ArticleContentRule.id)).scalar() == 0
assert db.session.query(func.count(Post.id)).scalar() == 0 assert session.query(func.count(Post.id)).scalar() == 0
def test_create_user(db):
"""New user creation"""
kwargs = {
'username': 'username',
'password': 'password',
'display_name': 'User Name',
'email': 'user@example.com',
'is_site_admin': False
}
with pytest.raises(ArgumentError):
userq.create_user(db, **{**kwargs, 'username': 'user name'})
with pytest.raises(ArgumentError):
userq.create_user(db, **{**kwargs, 'password': None})
new_user = userq.create_user(db, **kwargs)
assert new_user
assert new_user.id is not None
assert new_user.created is not None
with pytest.raises(ArgumentError):
duplicate = userq.create_user(db, **kwargs)