Reorganize lexicon module
This commit is contained in:
parent
4886e27e5d
commit
a58761d7a5
|
@ -1,97 +1,6 @@
|
|||
import os
|
||||
import time
|
||||
from amanuensis.lexicon.admin import valid_name, create_lexicon
|
||||
|
||||
from amanuensis.errors import (
|
||||
ArgumentError, IndexMismatchError, MissingConfigError)
|
||||
from amanuensis.config import prepend, json_ro, json_rw, root
|
||||
|
||||
class LexiconModel():
|
||||
@staticmethod
|
||||
def by(lid=None, name=None):
|
||||
"""
|
||||
Gets the LexiconModel with the given lid or username
|
||||
|
||||
If the lid or name simply does not match an existing lexicon, returns
|
||||
None. If the lid matches the index but there is something wrong with
|
||||
the lexicon's config, raises an error.
|
||||
"""
|
||||
if lid and name:
|
||||
raise ArgumentError("lid and name both specified to Lexicon"
|
||||
"Model.by()")
|
||||
if not lid and not name:
|
||||
raise ArgumentError("One of lid or name must be not None")
|
||||
if not lid:
|
||||
with json_ro('lexicon', 'index.json') as index:
|
||||
lid = index.get(name)
|
||||
if not lid:
|
||||
return None
|
||||
if not os.path.isdir(prepend('lexicon', lid)):
|
||||
raise IndexMismatchError("lexicon={} lid={}".format(name, lid))
|
||||
if not os.path.isfile(prepend('lexicon', lid, 'config.json')):
|
||||
raise MissingConfigError("lid={}".format(lid))
|
||||
return LexiconModel(lid)
|
||||
|
||||
def __init__(self, lid):
|
||||
if not os.path.isdir(prepend('lexicon', lid)):
|
||||
raise ValueError("No lexicon with lid {}".format(lid))
|
||||
if not os.path.isfile(prepend('lexicon', lid, 'config.json')):
|
||||
raise FileNotFoundError("Lexicon {} missing config.json".format(lid))
|
||||
self.id = str(lid)
|
||||
self.config_path = prepend('lexicon', lid, 'config.json')
|
||||
with json_ro(self.config_path) as j:
|
||||
self.config = j
|
||||
self.ctx = root.lexicon[self.id]
|
||||
|
||||
def __getattr__(self, key):
|
||||
if key not in self.config:
|
||||
raise AttributeError(key)
|
||||
if key == 'title':
|
||||
return self.config.get('title') or f'Lexicon {self.config.name}'
|
||||
return self.config.get(key)
|
||||
|
||||
def __str__(self):
|
||||
return '<Lexicon {0.name}>'.format(self)
|
||||
|
||||
def __repr__(self):
|
||||
return '<LexiconModel lid={0.id} name={0.name}>'.format(self)
|
||||
|
||||
def edit(self):
|
||||
return json_rw(self.config_path)
|
||||
|
||||
def add_log(self, message):
|
||||
now = int(time.time())
|
||||
with self.edit() as j:
|
||||
j['log'].append([now, message])
|
||||
|
||||
def status(self):
|
||||
if self.turn.current is None:
|
||||
return "unstarted"
|
||||
if self.turn.current > self.turn.max:
|
||||
return "completed"
|
||||
return "ongoing"
|
||||
|
||||
def can_add_character(self, uid):
|
||||
return (
|
||||
# Players can't add more characters than chars_per_player
|
||||
(len(self.get_characters_for_player(uid))
|
||||
< self.join.chars_per_player)
|
||||
# Characters can only be added before the game starts
|
||||
and not self.turn.current)
|
||||
|
||||
def get_characters_for_player(self, uid=None):
|
||||
return [
|
||||
char for char in self.character.values()
|
||||
if uid is None or char.player == uid]
|
||||
|
||||
def get_drafts_for_player(self, uid):
|
||||
chars = self.get_characters_for_player(uid=uid)
|
||||
drafts_path = prepend('lexicon', self.id, 'draft')
|
||||
drafts = []
|
||||
for filename in os.listdir(drafts_path):
|
||||
for char in chars:
|
||||
if filename.startswith(str(char.cid)):
|
||||
drafts.append(filename)
|
||||
for i in range(len(drafts)):
|
||||
with json_ro(drafts_path, drafts[i]) as a:
|
||||
drafts[i] = a
|
||||
return drafts
|
||||
__all__ = [member.__name__ for member in [
|
||||
valid_name,
|
||||
create_lexicon,
|
||||
]]
|
||||
|
|
|
@ -0,0 +1,91 @@
|
|||
"""
|
||||
Submodule of functions for creating and managing lexicons within the
|
||||
general Amanuensis context.
|
||||
"""
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from amanuensis.config import RootConfigDirectoryContext, AttrOrderedDict
|
||||
from amanuensis.errors import ArgumentError
|
||||
from amanuensis.models import ModelFactory, UserModel, LexiconModel
|
||||
from amanuensis.resources import get_stream
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def valid_name(name: str) -> bool:
|
||||
"""
|
||||
Validates that a lexicon name consists only of alpahnumerics, dashes,
|
||||
underscores, and spaces
|
||||
"""
|
||||
return re.match(r'^[A-Za-z0-9-_ ]+$', name) is not None
|
||||
|
||||
|
||||
def create_lexicon(
|
||||
root: RootConfigDirectoryContext,
|
||||
model_factory: ModelFactory,
|
||||
name: str,
|
||||
editor: UserModel) -> LexiconModel:
|
||||
"""
|
||||
Creates a lexicon with the given name and sets the given user as its editor
|
||||
"""
|
||||
# Verify arguments
|
||||
if not name:
|
||||
raise ArgumentError(f'Empty lexicon name: "{name}"')
|
||||
if not valid_name(name):
|
||||
raise ArgumentError(f'Invalid lexicon name: "{name}"')
|
||||
with root.lexicon.read_index() as extant_lexicons:
|
||||
if name in extant_lexicons.keys():
|
||||
raise ArgumentError(f'Lexicon name already taken: "{name}"')
|
||||
if editor is None:
|
||||
raise ArgumentError('Editor must not be None')
|
||||
|
||||
# Create the lexicon directory and initialize it with a blank lexicon
|
||||
lid: str = uuid.uuid4().hex
|
||||
lex_dir = os.path.join(root.lexicon.path, lid)
|
||||
os.mkdir(lex_dir)
|
||||
with get_stream("lexicon.json") as s:
|
||||
path: str = os.path.join(lex_dir, 'config.json')
|
||||
with open(path, 'wb') as f:
|
||||
f.write(s.read())
|
||||
|
||||
# Create subdirectories
|
||||
os.mkdir(os.path.join(lex_dir, 'draft'))
|
||||
os.mkdir(os.path.join(lex_dir, 'src'))
|
||||
os.mkdir(os.path.join(lex_dir, 'article'))
|
||||
|
||||
# Update the index with the new lexicon
|
||||
with root.lexicon.edit_index() as index:
|
||||
index[name] = lid
|
||||
|
||||
# Fill out the new lexicon
|
||||
with root.lexicon[lid].edit_config() as cfg:
|
||||
cfg.lid = lid
|
||||
cfg.name = name
|
||||
cfg.editor = editor.uid
|
||||
cfg.time.created = int(time.time())
|
||||
|
||||
with root.lexicon[lid].edit('info', create=True):
|
||||
pass # Create an empry config file
|
||||
|
||||
# Load the lexicon and add the editor and default character
|
||||
lexicon = model_factory.lexicon(lid)
|
||||
with lexicon.ctx.edit_config() as cfg:
|
||||
cfg.join.joined.append(editor.uid)
|
||||
with get_stream('character.json') as template:
|
||||
character = json.load(template, object_pairs_hook=AttrOrderedDict)
|
||||
character.cid = 'default'
|
||||
character.name = 'Ersatz Scrivener'
|
||||
character.player = None
|
||||
cfg.character.new(character.cid, character)
|
||||
|
||||
# Log the creation
|
||||
message = f'Created {lexicon.title}, ed. {editor.cfg.displayname} ({lid})'
|
||||
lexicon.log(message)
|
||||
logger.info(message)
|
||||
|
||||
return lexicon
|
|
@ -0,0 +1,4 @@
|
|||
"""
|
||||
Submodule of functions for managing lexicon games during the core game
|
||||
loop of writing and publishing articles.
|
||||
"""
|
|
@ -16,77 +16,8 @@ from amanuensis.lexicon import LexiconModel
|
|||
from amanuensis.parser import parse_raw_markdown, GetCitations, HtmlRenderer, filesafe_title, titlesort
|
||||
from amanuensis.resources import get_stream
|
||||
|
||||
def valid_name(name):
|
||||
"""
|
||||
Validates that a lexicon name consists only of alpahnumerics, dashes,
|
||||
underscores, and spaces
|
||||
"""
|
||||
return re.match(r"^[A-Za-z0-9-_ ]+$", name) is not None
|
||||
|
||||
|
||||
def create_lexicon(name, editor):
|
||||
"""
|
||||
Creates a lexicon with the given name and sets the given user as its editor
|
||||
"""
|
||||
# Verify arguments
|
||||
if not name:
|
||||
raise ArgumentError('Empty lexicon name: "{}"'.format(name))
|
||||
if not valid_name(name):
|
||||
raise ArgumentError('Invalid lexicon name: "{}"'.format(name))
|
||||
with json_ro('lexicon', 'index.json') as index:
|
||||
if name in index.keys():
|
||||
raise ArgumentError('Lexicon name already taken: "{}"'.format(
|
||||
name))
|
||||
if editor is None:
|
||||
raise ArgumentError("Invalid editor: '{}'".format(editor))
|
||||
|
||||
# Create the lexicon directory and initialize it with a blank lexicon
|
||||
lid = uuid.uuid4().hex
|
||||
lex_dir = prepend("lexicon", lid)
|
||||
os.mkdir(lex_dir)
|
||||
with get_stream("lexicon.json") as s:
|
||||
with open(prepend(lex_dir, 'config.json'), 'wb') as f:
|
||||
f.write(s.read())
|
||||
|
||||
# Fill out the new lexicon
|
||||
with json_rw(lex_dir, 'config.json') as cfg:
|
||||
cfg['lid'] = lid
|
||||
cfg['name'] = name
|
||||
cfg['editor'] = editor.uid
|
||||
cfg['time']['created'] = int(time.time())
|
||||
|
||||
with json_rw(lex_dir, 'info.json', new=True) as info:
|
||||
pass
|
||||
|
||||
# Create subdirectories
|
||||
os.mkdir(prepend(lex_dir, 'draft'))
|
||||
os.mkdir(prepend(lex_dir, 'src'))
|
||||
os.mkdir(prepend(lex_dir, 'article'))
|
||||
|
||||
# Update the index with the new lexicon
|
||||
with json_rw('lexicon', 'index.json') as index:
|
||||
index[name] = lid
|
||||
|
||||
# Load the Lexicon and log creation
|
||||
l = LexiconModel(lid)
|
||||
l.add_log("Lexicon created")
|
||||
|
||||
logger.info("Created Lexicon {0.name}, ed. {1.displayname} ({0.id})".format(
|
||||
l, editor))
|
||||
|
||||
# Add the editor
|
||||
add_player(l, editor)
|
||||
|
||||
# Add the fallback character
|
||||
add_character(l, editor, {
|
||||
"cid": "default",
|
||||
"name": "Ersatz Scrivener",
|
||||
"player": None,
|
||||
})
|
||||
with l.edit() as cfg:
|
||||
cfg.character.default.player = None
|
||||
|
||||
return l
|
||||
|
||||
|
||||
def delete_lexicon(lex, purge=False):
|
||||
|
|
|
@ -0,0 +1,4 @@
|
|||
"""
|
||||
Submodule of functions for managing lexicon games during the setup and
|
||||
joining part of the game lifecycle.
|
||||
"""
|
Loading…
Reference in New Issue