Refactor config module to encapsulate
This commit is contained in:
parent
28c1ef2dd5
commit
ab25d5174a
|
@ -1,15 +1,6 @@
|
||||||
# Standard library imports
|
|
||||||
import json
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
|
|
||||||
# Module imports
|
# Module imports
|
||||||
from amanuensis.errors import MissingConfigError, MalformedConfigError
|
from amanuensis.config.dict import AttrOrderedDict, ReadOnlyOrderedDict
|
||||||
import amanuensis.config.context
|
from amanuensis.config.directory import RootConfigDirectoryContext, is_guid
|
||||||
from amanuensis.config.context import is_guid
|
|
||||||
import amanuensis.config.init
|
|
||||||
import amanuensis.config.loader
|
|
||||||
|
|
||||||
|
|
||||||
# Environment variable name constants
|
# Environment variable name constants
|
||||||
ENV_SECRET_KEY = "AMANUENSIS_SECRET_KEY"
|
ENV_SECRET_KEY = "AMANUENSIS_SECRET_KEY"
|
||||||
|
@ -18,47 +9,9 @@ ENV_LOG_FILE = "AMANUENSIS_LOG_FILE"
|
||||||
ENV_LOG_FILE_SIZE = "AMANUENSIS_LOG_FILE_SIZE"
|
ENV_LOG_FILE_SIZE = "AMANUENSIS_LOG_FILE_SIZE"
|
||||||
ENV_LOG_FILE_NUM = "AMANUENSIS_LOG_FILE_NUM"
|
ENV_LOG_FILE_NUM = "AMANUENSIS_LOG_FILE_NUM"
|
||||||
|
|
||||||
#
|
__all__ = [
|
||||||
# The config directory can be set by cli input, so the config infrastructure
|
AttrOrderedDict.__name__,
|
||||||
# needs to wait for initialization before it can load any configs.
|
ReadOnlyOrderedDict.__name__,
|
||||||
#
|
RootConfigDirectoryContext.__name__,
|
||||||
CONFIG_DIR = None
|
is_guid.__name__,
|
||||||
GLOBAL_CONFIG = None
|
]
|
||||||
logger = None
|
|
||||||
root = None
|
|
||||||
|
|
||||||
def init_config(args):
|
|
||||||
"""
|
|
||||||
Initializes the config infrastructure to read configs from the
|
|
||||||
directory given by args.config_dir. Initializes logging.
|
|
||||||
"""
|
|
||||||
global CONFIG_DIR, GLOBAL_CONFIG, logger, root
|
|
||||||
CONFIG_DIR = args.config_dir
|
|
||||||
amanuensis.config.init.verify_config_dir(CONFIG_DIR)
|
|
||||||
with amanuensis.config.loader.json_ro(
|
|
||||||
os.path.join(CONFIG_DIR, "config.json")) as cfg:
|
|
||||||
GLOBAL_CONFIG = cfg
|
|
||||||
amanuensis.config.init.init_logging(args, GLOBAL_CONFIG['logging'])
|
|
||||||
logger = logging.getLogger("amanuensis")
|
|
||||||
root = amanuensis.config.context.RootConfigDirectoryContext(CONFIG_DIR)
|
|
||||||
|
|
||||||
def get(key):
|
|
||||||
return GLOBAL_CONFIG[key]
|
|
||||||
|
|
||||||
def prepend(*path):
|
|
||||||
joined = os.path.join(*path)
|
|
||||||
if not joined.startswith(CONFIG_DIR):
|
|
||||||
joined = os.path.join(CONFIG_DIR, joined)
|
|
||||||
return joined
|
|
||||||
|
|
||||||
def open_sh(*path, **kwargs):
|
|
||||||
return amanuensis.config.loader.open_sh(prepend(*path), **kwargs)
|
|
||||||
|
|
||||||
def open_ex(*path, **kwargs):
|
|
||||||
return amanuensis.config.loader.open_ex(prepend(*path), **kwargs)
|
|
||||||
|
|
||||||
def json_ro(*path, **kwargs):
|
|
||||||
return amanuensis.config.loader.json_ro(prepend(*path), **kwargs)
|
|
||||||
|
|
||||||
def json_rw(*path, **kwargs):
|
|
||||||
return amanuensis.config.loader.json_rw(prepend(*path), **kwargs)
|
|
||||||
|
|
|
@ -1,140 +1,82 @@
|
||||||
import os
|
"""
|
||||||
import re
|
`with` context managers for mediating config file access.
|
||||||
|
"""
|
||||||
|
# Standard library imports
|
||||||
|
import fcntl
|
||||||
|
import json
|
||||||
|
|
||||||
from amanuensis.config.loader import json_ro, json_rw
|
# Application imports
|
||||||
from amanuensis.errors import MissingConfigError, ConfigAlreadyExistsError
|
from amanuensis.config.dict import AttrOrderedDict, ReadOnlyOrderedDict
|
||||||
|
|
||||||
|
|
||||||
def is_guid(s):
|
class open_lock():
|
||||||
return re.match(r'[0-9a-z]{32}', s.lower())
|
"""A context manager that opens a file with the specified file lock"""
|
||||||
|
def __init__(self, path, mode, lock_type):
|
||||||
|
self.fd = open(path, mode, encoding='utf8')
|
||||||
|
fcntl.lockf(self.fd, lock_type)
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
return self.fd
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_value, traceback):
|
||||||
|
fcntl.lockf(self.fd, fcntl.LOCK_UN)
|
||||||
|
self.fd.close()
|
||||||
|
|
||||||
|
|
||||||
class ConfigDirectoryContext():
|
class open_sh(open_lock):
|
||||||
|
"""A context manager that opens a file with a shared lock"""
|
||||||
|
def __init__(self, path, mode):
|
||||||
|
super().__init__(path, mode, fcntl.LOCK_SH)
|
||||||
|
|
||||||
|
|
||||||
|
class open_ex(open_lock):
|
||||||
|
"""A context manager that opens a file with an exclusive lock"""
|
||||||
|
def __init__(self, path, mode):
|
||||||
|
super().__init__(path, mode, fcntl.LOCK_EX)
|
||||||
|
|
||||||
|
|
||||||
|
class json_ro(open_sh):
|
||||||
"""
|
"""
|
||||||
Base class for CRUD operations on config files in a config
|
A context manager that opens a file in a shared, read-only mode.
|
||||||
directory.
|
The contents of the file are read as JSON and returned as a read-
|
||||||
|
only OrderedDict.
|
||||||
"""
|
"""
|
||||||
def __init__(self, path):
|
def __init__(self, path):
|
||||||
self.path = path
|
super().__init__(path, 'r')
|
||||||
if not os.path.isdir(self.path):
|
self.config = None
|
||||||
raise MissingConfigError(path)
|
|
||||||
|
|
||||||
def new(self, filename):
|
def __enter__(self) -> ReadOnlyOrderedDict:
|
||||||
"""
|
self.config = json.load(self.fd, object_pairs_hook=ReadOnlyOrderedDict)
|
||||||
Creates a JSON file that doesn't already exist.
|
return self.config
|
||||||
"""
|
|
||||||
if not filename.endswith('.json'):
|
|
||||||
filename = f'{filename}.json'
|
|
||||||
fpath = os.path.join(self.path, filename)
|
|
||||||
if os.path.isfile(fpath):
|
|
||||||
raise ConfigAlreadyExistsError(fpath)
|
|
||||||
return json_rw(fpath, new=True)
|
|
||||||
|
|
||||||
def read(self, filename):
|
|
||||||
"""
|
|
||||||
Loads a JSON file in read-only mode.
|
|
||||||
"""
|
|
||||||
if not filename.endswith('.json'):
|
|
||||||
filename = f'{filename}.json'
|
|
||||||
fpath = os.path.join(self.path, filename)
|
|
||||||
if not os.path.isfile(fpath):
|
|
||||||
raise MissingConfigError(fpath)
|
|
||||||
return json_ro(fpath)
|
|
||||||
|
|
||||||
def edit(self, filename, create=False):
|
|
||||||
"""
|
|
||||||
Loads a JSON file in write mode.
|
|
||||||
"""
|
|
||||||
if not filename.endswith('.json'):
|
|
||||||
filename = f'{filename}.json'
|
|
||||||
fpath = os.path.join(self.path, filename)
|
|
||||||
if not create and not os.path.isfile(fpath):
|
|
||||||
raise MissingConfigError(fpath)
|
|
||||||
return json_rw(fpath, new=create)
|
|
||||||
|
|
||||||
def delete(self, filename):
|
|
||||||
"""Deletes a file."""
|
|
||||||
if not filename.endswith('.json'):
|
|
||||||
filename = f'{filename}.json'
|
|
||||||
fpath = os.path.join(self.path, filename)
|
|
||||||
if not os.path.isfile(fpath):
|
|
||||||
raise MissingConfigError(fpath)
|
|
||||||
os.remove(fpath)
|
|
||||||
|
|
||||||
def ls(self):
|
|
||||||
"""Lists all files in this directory."""
|
|
||||||
filenames = os.listdir(self.path)
|
|
||||||
return filenames
|
|
||||||
|
|
||||||
|
|
||||||
class ConfigFileMixin():
|
class json_rw(open_ex):
|
||||||
"""Mixin for objects that have config files."""
|
"""
|
||||||
def config(self, edit=False):
|
A context manager that opens a file with an exclusive lock. The
|
||||||
"""Context manager for this object's config file."""
|
file mode defaults to r+, which requires that the file exist. The
|
||||||
if edit:
|
file mode can be set to w+ to create a new file by setting the new
|
||||||
return self.edit('config')
|
kwarg in the ctor. The contents of the file are read as JSON and
|
||||||
|
returned in an AttrOrderedDict. Any changes to the context dict
|
||||||
|
will be written out to the file when the context manager exits,
|
||||||
|
unless an exception is raised before exiting.
|
||||||
|
"""
|
||||||
|
def __init__(self, path, new=False):
|
||||||
|
mode = 'w+' if new else 'r+'
|
||||||
|
super().__init__(path, mode)
|
||||||
|
self.config = None
|
||||||
|
self.new = new
|
||||||
|
|
||||||
|
def __enter__(self) -> AttrOrderedDict:
|
||||||
|
if not self.new:
|
||||||
|
self.config = json.load(self.fd, object_pairs_hook=AttrOrderedDict)
|
||||||
else:
|
else:
|
||||||
return self.read('config')
|
self.config = AttrOrderedDict()
|
||||||
|
return self.config
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_value, traceback):
|
||||||
class IndexDirectoryContext(ConfigDirectoryContext):
|
# Only write the new value out if there wasn't an exception
|
||||||
"""
|
if not exc_type:
|
||||||
A lookup layer for getting config directory contexts for lexicon
|
self.fd.seek(0)
|
||||||
or user directories.
|
json.dump(self.config, self.fd, allow_nan=False, indent='\t')
|
||||||
"""
|
self.fd.truncate()
|
||||||
def __init__(self, path, cdc_type):
|
super().__exit__(exc_type, exc_value, traceback)
|
||||||
super().__init__(path)
|
|
||||||
self.cdc_type = cdc_type
|
|
||||||
|
|
||||||
def __getitem__(self, key):
|
|
||||||
"""
|
|
||||||
Returns a context to the given item. key is treated as the
|
|
||||||
item's id if it's a guid string, otherwise it's treated as
|
|
||||||
the item's indexed name and run through the index first.
|
|
||||||
"""
|
|
||||||
if not is_guid(key):
|
|
||||||
with self.index() as index:
|
|
||||||
iid = index.get(key)
|
|
||||||
if not iid:
|
|
||||||
raise MissingConfigError(key)
|
|
||||||
key = iid
|
|
||||||
return self.cdc_type(os.path.join(self.path, key))
|
|
||||||
|
|
||||||
def index(self, edit=False):
|
|
||||||
if edit:
|
|
||||||
return self.edit('index')
|
|
||||||
else:
|
|
||||||
return self.read('index')
|
|
||||||
|
|
||||||
|
|
||||||
class RootConfigDirectoryContext(ConfigDirectoryContext):
|
|
||||||
"""
|
|
||||||
Context for the config directory with links to the lexicon and
|
|
||||||
user contexts.
|
|
||||||
"""
|
|
||||||
def __init__(self, path):
|
|
||||||
super().__init__(path)
|
|
||||||
self.lexicon = IndexDirectoryContext(
|
|
||||||
os.path.join(self.path, 'lexicon'),
|
|
||||||
LexiconConfigDirectoryContext)
|
|
||||||
self.user = IndexDirectoryContext(
|
|
||||||
os.path.join(self.path, 'user'),
|
|
||||||
UserConfigDirectoryContext)
|
|
||||||
|
|
||||||
|
|
||||||
class LexiconConfigDirectoryContext(ConfigFileMixin, ConfigDirectoryContext):
|
|
||||||
"""
|
|
||||||
A config context for a lexicon's config directory.
|
|
||||||
"""
|
|
||||||
def __init__(self, path):
|
|
||||||
super().__init__(path)
|
|
||||||
self.draft = ConfigDirectoryContext(os.path.join(self.path, 'draft'))
|
|
||||||
self.src = ConfigDirectoryContext(os.path.join(self.path, 'src'))
|
|
||||||
self.article = ConfigDirectoryContext(os.path.join(self.path, 'article'))
|
|
||||||
|
|
||||||
|
|
||||||
class UserConfigDirectoryContext(ConfigFileMixin, ConfigDirectoryContext):
|
|
||||||
"""
|
|
||||||
A config context for a user's config directory.
|
|
||||||
"""
|
|
||||||
pass
|
|
||||||
|
|
|
@ -0,0 +1,52 @@
|
||||||
|
"""
|
||||||
|
Dictionary classes used to represent JSON config files in memory.
|
||||||
|
"""
|
||||||
|
from collections import OrderedDict
|
||||||
|
|
||||||
|
from amanuensis.errors import ReadOnlyError
|
||||||
|
|
||||||
|
|
||||||
|
class AttrOrderedDict(OrderedDict):
|
||||||
|
"""
|
||||||
|
An OrderedDict with attribute access to known keys and explicit
|
||||||
|
creation of new keys.
|
||||||
|
"""
|
||||||
|
def __getattr__(self, key):
|
||||||
|
if key not in self:
|
||||||
|
raise AttributeError(key)
|
||||||
|
return self[key]
|
||||||
|
|
||||||
|
def __setattr__(self, key, value):
|
||||||
|
if key not in self:
|
||||||
|
raise AttributeError(key)
|
||||||
|
self[key] = value
|
||||||
|
|
||||||
|
def new(self, key, value):
|
||||||
|
"""Setter for adding new keys"""
|
||||||
|
if key in self:
|
||||||
|
raise KeyError("Key already exists: '{}'".format(key))
|
||||||
|
self[key] = value
|
||||||
|
|
||||||
|
|
||||||
|
class ReadOnlyOrderedDict(OrderedDict):
|
||||||
|
"""
|
||||||
|
An OrderedDict that cannot be modified with attribute access to
|
||||||
|
known keys.
|
||||||
|
"""
|
||||||
|
def __readonly__(self, *args, **kwargs):
|
||||||
|
raise ReadOnlyError("Cannot modify a ReadOnlyOrderedDict")
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super(ReadOnlyOrderedDict, self).__init__(*args, **kwargs)
|
||||||
|
self.__setitem__ = self.__readonly__
|
||||||
|
self.__delitem__ = self.__readonly__
|
||||||
|
self.pop = self.__readonly__
|
||||||
|
self.popitem = self.__readonly__
|
||||||
|
self.clear = self.__readonly__
|
||||||
|
self.update = self.__readonly__
|
||||||
|
self.setdefault = self.__readonly__
|
||||||
|
|
||||||
|
def __getattr__(self, key):
|
||||||
|
if key not in self:
|
||||||
|
raise AttributeError(key)
|
||||||
|
return self[key]
|
|
@ -0,0 +1,158 @@
|
||||||
|
"""
|
||||||
|
Config directory abstractions that encapsulate path munging and context
|
||||||
|
manager usage.
|
||||||
|
"""
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
from typing import Iterable, Union
|
||||||
|
|
||||||
|
from amanuensis.config.context import json_ro, json_rw
|
||||||
|
from amanuensis.errors import MissingConfigError, ConfigAlreadyExistsError
|
||||||
|
|
||||||
|
|
||||||
|
def is_guid(s: str) -> bool:
|
||||||
|
return bool(re.match(r'[0-9a-z]{32}', s.lower()))
|
||||||
|
|
||||||
|
|
||||||
|
class ConfigDirectoryContext():
|
||||||
|
"""
|
||||||
|
Base class for CRUD operations on config files in a config
|
||||||
|
directory.
|
||||||
|
"""
|
||||||
|
def __init__(self, path: str):
|
||||||
|
self.path: str = path
|
||||||
|
if not os.path.isdir(self.path):
|
||||||
|
raise MissingConfigError(path)
|
||||||
|
|
||||||
|
def new(self, filename) -> json_rw:
|
||||||
|
"""
|
||||||
|
Creates a JSON file that doesn't already exist.
|
||||||
|
"""
|
||||||
|
if not filename.endswith('.json'):
|
||||||
|
filename = f'{filename}.json'
|
||||||
|
fpath: str = os.path.join(self.path, filename)
|
||||||
|
if os.path.isfile(fpath):
|
||||||
|
raise ConfigAlreadyExistsError(fpath)
|
||||||
|
return json_rw(fpath, new=True)
|
||||||
|
|
||||||
|
def read(self, filename) -> json_ro:
|
||||||
|
"""
|
||||||
|
Loads a JSON file in read-only mode.
|
||||||
|
"""
|
||||||
|
if not filename.endswith('.json'):
|
||||||
|
filename = f'{filename}.json'
|
||||||
|
fpath: str = os.path.join(self.path, filename)
|
||||||
|
if not os.path.isfile(fpath):
|
||||||
|
raise MissingConfigError(fpath)
|
||||||
|
return json_ro(fpath)
|
||||||
|
|
||||||
|
def edit(self, filename, create=False) -> json_rw:
|
||||||
|
"""
|
||||||
|
Loads a JSON file in write mode.
|
||||||
|
"""
|
||||||
|
if not filename.endswith('.json'):
|
||||||
|
filename = f'{filename}.json'
|
||||||
|
fpath: str = os.path.join(self.path, filename)
|
||||||
|
if not create and not os.path.isfile(fpath):
|
||||||
|
raise MissingConfigError(fpath)
|
||||||
|
return json_rw(fpath, new=create)
|
||||||
|
|
||||||
|
def delete(self, filename) -> None:
|
||||||
|
"""Deletes a file."""
|
||||||
|
if not filename.endswith('.json'):
|
||||||
|
filename = f'{filename}.json'
|
||||||
|
fpath: str = os.path.join(self.path, filename)
|
||||||
|
if not os.path.isfile(fpath):
|
||||||
|
raise MissingConfigError(fpath)
|
||||||
|
os.remove(fpath)
|
||||||
|
|
||||||
|
def ls(self) -> Iterable[str]:
|
||||||
|
"""Lists all files in this directory."""
|
||||||
|
filenames: Iterable[str] = os.listdir(self.path)
|
||||||
|
return filenames
|
||||||
|
|
||||||
|
|
||||||
|
class ConfigFileConfigDirectoryContext(ConfigDirectoryContext):
|
||||||
|
"""
|
||||||
|
Config directory with a `config.json`.
|
||||||
|
"""
|
||||||
|
def __init__(self, path: str):
|
||||||
|
super().__init__(path)
|
||||||
|
config_path = os.path.join(self.path, 'config.json')
|
||||||
|
if not os.path.isfile(config_path):
|
||||||
|
raise MissingConfigError(config_path)
|
||||||
|
|
||||||
|
def config(self, edit: bool = False) -> Union[json_ro, json_rw]:
|
||||||
|
"""Context manager for this object's config file."""
|
||||||
|
if edit:
|
||||||
|
return self.edit('config')
|
||||||
|
else:
|
||||||
|
return self.read('config')
|
||||||
|
|
||||||
|
|
||||||
|
class IndexDirectoryContext(ConfigDirectoryContext):
|
||||||
|
"""
|
||||||
|
A lookup layer for getting config directory contexts for lexicon
|
||||||
|
or user directories.
|
||||||
|
"""
|
||||||
|
def __init__(self, path: str, cdc_type: type):
|
||||||
|
super().__init__(path)
|
||||||
|
index_path = os.path.join(self.path, 'index.json')
|
||||||
|
if not os.path.isfile(index_path):
|
||||||
|
raise MissingConfigError(index_path)
|
||||||
|
self.cdc_type = cdc_type
|
||||||
|
|
||||||
|
def __getitem__(self, key: str):
|
||||||
|
"""
|
||||||
|
Returns a context to the given item. key is treated as the
|
||||||
|
item's id if it's a guid string, otherwise it's treated as
|
||||||
|
the item's indexed name and run through the index first.
|
||||||
|
"""
|
||||||
|
if not is_guid(key):
|
||||||
|
with self.index() as index:
|
||||||
|
iid = index.get(key)
|
||||||
|
if not iid:
|
||||||
|
raise MissingConfigError(key)
|
||||||
|
key = iid
|
||||||
|
return self.cdc_type(os.path.join(self.path, key))
|
||||||
|
|
||||||
|
def index(self, edit=False) -> Union[json_ro, json_rw]:
|
||||||
|
if edit:
|
||||||
|
return self.edit('index')
|
||||||
|
else:
|
||||||
|
return self.read('index')
|
||||||
|
|
||||||
|
|
||||||
|
class RootConfigDirectoryContext(ConfigDirectoryContext):
|
||||||
|
"""
|
||||||
|
Context for the config directory with links to the lexicon and
|
||||||
|
user contexts.
|
||||||
|
"""
|
||||||
|
def __init__(self, path):
|
||||||
|
super().__init__(path)
|
||||||
|
self.lexicon: IndexDirectoryContext = IndexDirectoryContext(
|
||||||
|
os.path.join(self.path, 'lexicon'),
|
||||||
|
LexiconConfigDirectoryContext)
|
||||||
|
self.user: IndexDirectoryContext = IndexDirectoryContext(
|
||||||
|
os.path.join(self.path, 'user'),
|
||||||
|
UserConfigDirectoryContext)
|
||||||
|
|
||||||
|
|
||||||
|
class LexiconConfigDirectoryContext(ConfigFileConfigDirectoryContext):
|
||||||
|
"""
|
||||||
|
A config context for a lexicon's config directory.
|
||||||
|
"""
|
||||||
|
def __init__(self, path):
|
||||||
|
super().__init__(path)
|
||||||
|
self.draft: ConfigDirectoryContext = ConfigDirectoryContext(
|
||||||
|
os.path.join(self.path, 'draft'))
|
||||||
|
self.src: ConfigDirectoryContext = ConfigDirectoryContext(
|
||||||
|
os.path.join(self.path, 'src'))
|
||||||
|
self.article: ConfigDirectoryContext = ConfigDirectoryContext(
|
||||||
|
os.path.join(self.path, 'article'))
|
||||||
|
|
||||||
|
|
||||||
|
class UserConfigDirectoryContext(ConfigFileConfigDirectoryContext):
|
||||||
|
"""
|
||||||
|
A config context for a user's config directory.
|
||||||
|
"""
|
|
@ -1,119 +0,0 @@
|
||||||
# Standard library imports
|
|
||||||
from collections import OrderedDict
|
|
||||||
import fcntl
|
|
||||||
import json
|
|
||||||
|
|
||||||
# Module imports
|
|
||||||
from amanuensis.errors import ReadOnlyError
|
|
||||||
|
|
||||||
|
|
||||||
class AttrOrderedDict(OrderedDict):
|
|
||||||
"""An ordered dictionary with access via __getattr__"""
|
|
||||||
def __getattr__(self, key):
|
|
||||||
if key not in self:
|
|
||||||
raise AttributeError(key)
|
|
||||||
return self[key]
|
|
||||||
|
|
||||||
def __setattr__(self, key, value):
|
|
||||||
if key not in self:
|
|
||||||
raise AttributeError(key)
|
|
||||||
self[key] = value
|
|
||||||
|
|
||||||
def new(self, key, value):
|
|
||||||
"""Setter for adding new keys"""
|
|
||||||
if key in self:
|
|
||||||
raise KeyError("Key already exists: '{}'".format(key))
|
|
||||||
self[key] = value
|
|
||||||
|
|
||||||
|
|
||||||
class ReadOnlyOrderedDict(OrderedDict):
|
|
||||||
"""An ordered dictionary that cannot be modified"""
|
|
||||||
def __readonly__(self, *args, **kwargs):
|
|
||||||
raise ReadOnlyError("Cannot modify a ReadOnlyOrderedDict")
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
super(ReadOnlyOrderedDict, self).__init__(*args, **kwargs)
|
|
||||||
self.__setitem__ = self.__readonly__
|
|
||||||
self.__delitem__ = self.__readonly__
|
|
||||||
self.pop = self.__readonly__
|
|
||||||
self.popitem = self.__readonly__
|
|
||||||
self.clear = self.__readonly__
|
|
||||||
self.update = self.__readonly__
|
|
||||||
self.setdefault = self.__readonly__
|
|
||||||
|
|
||||||
def __getattr__(self, key):
|
|
||||||
if key not in self:
|
|
||||||
raise AttributeError(key)
|
|
||||||
return self[key]
|
|
||||||
|
|
||||||
|
|
||||||
class open_lock():
|
|
||||||
"""A context manager that opens a file with the specified file lock"""
|
|
||||||
def __init__(self, path, mode, lock_type):
|
|
||||||
self.fd = open(path, mode, encoding='utf8')
|
|
||||||
fcntl.lockf(self.fd, lock_type)
|
|
||||||
|
|
||||||
def __enter__(self):
|
|
||||||
return self.fd
|
|
||||||
|
|
||||||
def __exit__(self, exc_type, exc_value, traceback):
|
|
||||||
fcntl.lockf(self.fd, fcntl.LOCK_UN)
|
|
||||||
self.fd.close()
|
|
||||||
|
|
||||||
|
|
||||||
class open_sh(open_lock):
|
|
||||||
"""A context manager that opens a file with a shared lock"""
|
|
||||||
def __init__(self, path, mode):
|
|
||||||
super().__init__(path, mode, fcntl.LOCK_SH)
|
|
||||||
|
|
||||||
|
|
||||||
class open_ex(open_lock):
|
|
||||||
"""A context manager that opens a file with an exclusive lock"""
|
|
||||||
def __init__(self, path, mode):
|
|
||||||
super().__init__(path, mode, fcntl.LOCK_EX)
|
|
||||||
|
|
||||||
|
|
||||||
class json_ro(open_sh):
|
|
||||||
"""
|
|
||||||
A context manager that opens a file in a shared, read-only mode.
|
|
||||||
The contents of the file are read as JSON and returned as a read-
|
|
||||||
only OrderedDict.
|
|
||||||
"""
|
|
||||||
def __init__(self, path):
|
|
||||||
super().__init__(path, 'r')
|
|
||||||
self.config = None
|
|
||||||
|
|
||||||
def __enter__(self):
|
|
||||||
self.config = json.load(self.fd, object_pairs_hook=ReadOnlyOrderedDict)
|
|
||||||
return self.config
|
|
||||||
|
|
||||||
|
|
||||||
class json_rw(open_ex):
|
|
||||||
"""
|
|
||||||
A context manager that opens a file with an exclusive lock. The
|
|
||||||
file mode defaults to r+, which requires that the file exist. The
|
|
||||||
file mode can be set to w+ to create a new file by setting the new
|
|
||||||
kwarg in the ctor. The contents of the file are read as JSON and
|
|
||||||
returned in an AttrOrderedDict. Any changes to the context dict
|
|
||||||
will be written out to the file when the context manager exits.
|
|
||||||
"""
|
|
||||||
def __init__(self, path, new=False):
|
|
||||||
mode = 'w+' if new else 'r+'
|
|
||||||
super().__init__(path, mode)
|
|
||||||
self.config = None
|
|
||||||
self.new = new
|
|
||||||
|
|
||||||
def __enter__(self):
|
|
||||||
if not self.new:
|
|
||||||
self.config = json.load(self.fd, object_pairs_hook=AttrOrderedDict)
|
|
||||||
else:
|
|
||||||
self.config = AttrOrderedDict()
|
|
||||||
return self.config
|
|
||||||
|
|
||||||
def __exit__(self, exc_type, exc_value, traceback):
|
|
||||||
# Only write the new value out if there wasn't an exception
|
|
||||||
if not exc_type:
|
|
||||||
self.fd.seek(0)
|
|
||||||
json.dump(self.config, self.fd, allow_nan=False, indent='\t')
|
|
||||||
self.fd.truncate()
|
|
||||||
super().__exit__(exc_type, exc_value, traceback)
|
|
Loading…
Reference in New Issue