Separate config read and edit to distinguish return types

This commit is contained in:
Tim Van Baak 2020-04-23 20:35:55 -07:00
parent 1f24f4f9b1
commit 50216281b2
4 changed files with 27 additions and 32 deletions

View File

@ -52,7 +52,7 @@ def command_generate_secret(args):
"""
root: RootConfigDirectoryContext = args.root
secret_key: bytes = os.urandom(32)
with root.config(edit=True) as cfg:
with root.edit_config() as cfg:
cfg.secret_key = secret_key.hex()
logger.info("Regenerated Flask secret key")
return 0
@ -106,11 +106,11 @@ def command_config(args):
return -1
if args.get:
with root.config(edit=False) as cfg:
with root.read_config() as cfg:
config_get(cfg, args.get)
if args.set:
with root.config(edit=True) as cfg:
with root.edit_config() as cfg:
config_set("config", cfg, args.set)
return 0

View File

@ -4,7 +4,7 @@ manager usage.
"""
import os
import re
from typing import Iterable, Union
from typing import Iterable
from amanuensis.config.context import json_ro, json_rw
from amanuensis.errors import MissingConfigError, ConfigAlreadyExistsError
@ -82,12 +82,13 @@ class ConfigFileConfigDirectoryContext(ConfigDirectoryContext):
if not os.path.isfile(config_path):
raise MissingConfigError(config_path)
def config(self, edit: bool = False) -> Union[json_ro, json_rw]:
"""Context manager for this object's config file."""
if edit:
return self.edit('config')
else:
return self.read('config')
def edit_config(self) -> json_rw:
"""rw context manager for this object's config file."""
return self.edit('config')
def read_config(self) -> json_ro:
"""ro context manager for this object's config file."""
return self.read('config')
class IndexDirectoryContext(ConfigDirectoryContext):
@ -102,25 +103,25 @@ class IndexDirectoryContext(ConfigDirectoryContext):
raise MissingConfigError(index_path)
self.cdc_type = cdc_type
def __getitem__(self, key: str):
def __getitem__(self, key: str) -> ConfigFileConfigDirectoryContext:
"""
Returns a context to the given item. key is treated as the
item's id if it's a guid string, otherwise it's treated as
the item's indexed name and run through the index first.
"""
if not is_guid(key):
with self.index() as index:
with self.read_index() as index:
iid = index.get(key)
if not iid:
raise MissingConfigError(key)
key = iid
return self.cdc_type(os.path.join(self.path, key))
def index(self, edit=False) -> Union[json_ro, json_rw]:
if edit:
return self.edit('index')
else:
return self.read('index')
def edit_index(self) -> json_rw:
return self.edit('index')
def read_index(self) -> json_ro:
return self.read('index')
class RootConfigDirectoryContext(ConfigFileConfigDirectoryContext):

View File

@ -5,7 +5,6 @@ from amanuensis.config import (
RootConfigDirectoryContext,
LexiconConfigDirectoryContext,
ReadOnlyOrderedDict)
from amanuensis.config.context import json_rw
class LexiconModel():
@ -13,8 +12,9 @@ class LexiconModel():
def __init__(self, root: RootConfigDirectoryContext, lid: str):
self._lid: str = lid
# Creating the config context implicitly checks for existence
self._ctx: LexiconConfigDirectoryContext = root.lexicon[lid]
with self._ctx.config(edit=False) as config:
self._ctx: LexiconConfigDirectoryContext = (
cast(LexiconConfigDirectoryContext, root.lexicon[lid]))
with self._ctx.read_config() as config:
self._cfg: ReadOnlyOrderedDict = cast(ReadOnlyOrderedDict, config)
def __str__(self) -> str:
@ -46,12 +46,9 @@ class LexiconModel():
def title(self) -> str:
return self.cfg.get('title', f'Lexicon {self.cfg.name}')
def edit(self) -> json_rw:
return cast(json_rw, self.ctx.config(edit=True))
def log(self, message: str) -> None:
now = int(time.time())
with self.edit() as cfg:
with self.ctx.edit_config() as cfg:
cfg.log.append([now, message])
@property

View File

@ -6,7 +6,6 @@ from amanuensis.config import (
RootConfigDirectoryContext,
UserConfigDirectoryContext,
ReadOnlyOrderedDict)
from amanuensis.config.context import json_rw
class UserModelBase():
@ -52,8 +51,9 @@ class UserModel(UserModelBase):
def __init__(self, root: RootConfigDirectoryContext, uid: str):
self._uid: str = uid
# Creating the config context implicitly checks for existence
self._ctx: UserConfigDirectoryContext = root.user[uid]
with self._ctx.config(edit=False) as config:
self._ctx: UserConfigDirectoryContext = (
cast(UserConfigDirectoryContext, root.user[uid]))
with self._ctx.read_config() as config:
self._cfg: ReadOnlyOrderedDict = cast(ReadOnlyOrderedDict, config)
def __str__(self) -> str:
@ -64,16 +64,13 @@ class UserModel(UserModelBase):
# Utility methods
def edit(self) -> json_rw:
return cast(json_rw, self.ctx.config(edit=True))
def set_password(self, password: str) -> None:
pw_hash = generate_password_hash(password)
with self.edit() as cfg:
with self.ctx.edit_config() as cfg:
cfg['password'] = pw_hash
def check_password(self, password) -> bool:
with self.ctx.config() as cfg:
with self.ctx.read_config() as cfg:
return check_password_hash(cfg.password, password)