Add ConfigDirectoryContext framework
This commit is contained in:
parent
1e152851d0
commit
e22982e6ce
|
@ -0,0 +1,134 @@
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
|
||||||
|
from amanuensis.config.loader import json_ro, json_rw
|
||||||
|
from amanuensis.errors import MissingConfigError, ConfigAlreadyExistsError
|
||||||
|
|
||||||
|
|
||||||
|
def is_guid(s):
|
||||||
|
return re.match(r'[0-9a-z]{32}', s.lower())
|
||||||
|
|
||||||
|
|
||||||
|
class ConfigDirectoryContext():
|
||||||
|
"""
|
||||||
|
Base class for CRUD operations on config files in a config
|
||||||
|
directory.
|
||||||
|
"""
|
||||||
|
def __init__(self, path):
|
||||||
|
self.path = path
|
||||||
|
if not os.path.isdir(self.path):
|
||||||
|
raise MissingConfigError(path)
|
||||||
|
|
||||||
|
def new(self, filename):
|
||||||
|
"""
|
||||||
|
Creates a JSON file that doesn't already exist.
|
||||||
|
"""
|
||||||
|
if not filename.endswith('.json'):
|
||||||
|
filename = f'{filename}.json'
|
||||||
|
fpath = os.path.join(self.path, filename)
|
||||||
|
if os.path.isfile(fpath):
|
||||||
|
raise ConfigAlreadyExistsError(fpath)
|
||||||
|
return json_rw(fpath, new=True)
|
||||||
|
|
||||||
|
def read(self, filename):
|
||||||
|
"""
|
||||||
|
Loads a JSON file in read-only mode.
|
||||||
|
"""
|
||||||
|
if not filename.endswith('.json'):
|
||||||
|
filename = f'{filename}.json'
|
||||||
|
fpath = os.path.join(self.path, filename)
|
||||||
|
if not os.path.isfile(fpath):
|
||||||
|
raise MissingConfigError(path)
|
||||||
|
return json_ro(fpath)
|
||||||
|
|
||||||
|
def edit(self, filename):
|
||||||
|
"""
|
||||||
|
Loads a JSON file in write mode.
|
||||||
|
"""
|
||||||
|
if not filename.endswith('.json'):
|
||||||
|
filename = f'{filename}.json'
|
||||||
|
fpath = os.path.join(self.path, filename)
|
||||||
|
if not os.path.isfile(fpath):
|
||||||
|
raise MissingConfigError(path)
|
||||||
|
return json_rw(fpath, new=False)
|
||||||
|
|
||||||
|
def delete(self, filename):
|
||||||
|
"""Deletes a file."""
|
||||||
|
if not filename.endswith('.json'):
|
||||||
|
filename = f'{filename}.json'
|
||||||
|
fpath = os.path.join(self.path, filename)
|
||||||
|
if not os.path.isfile(fpath):
|
||||||
|
raise MissingConfigError(path)
|
||||||
|
os.delete(fpath)
|
||||||
|
|
||||||
|
|
||||||
|
class IndexDirectoryContext(ConfigDirectoryContext):
|
||||||
|
"""
|
||||||
|
A lookup layer for getting config directory contexts for lexicon
|
||||||
|
or user directories.
|
||||||
|
"""
|
||||||
|
def __init__(self, path, cdc_type):
|
||||||
|
super().__init__(path)
|
||||||
|
self.cdc_type = cdc_type
|
||||||
|
|
||||||
|
def __getitem__(self, key):
|
||||||
|
"""
|
||||||
|
Returns a context to the given item. key is treated as the
|
||||||
|
item's id if it's a guid string, otherwise it's treated as
|
||||||
|
the item's indexed name and run through the index first.
|
||||||
|
"""
|
||||||
|
if not is_guid(key):
|
||||||
|
with self.index() as index:
|
||||||
|
iid = index.get(key)
|
||||||
|
if not iid:
|
||||||
|
raise MissingConfigError(key)
|
||||||
|
key = iid
|
||||||
|
return self.cdc_type(os.path.join(self.path, key))
|
||||||
|
|
||||||
|
def index(self, edit=False):
|
||||||
|
if edit:
|
||||||
|
return self.edit('index')
|
||||||
|
else:
|
||||||
|
return self.read('index')
|
||||||
|
|
||||||
|
|
||||||
|
class RootConfigDirectoryContext(ConfigDirectoryContext):
|
||||||
|
"""
|
||||||
|
Context for the config directory with links to the lexicon and
|
||||||
|
user contexts.
|
||||||
|
"""
|
||||||
|
def __init__(self, path):
|
||||||
|
super().__init__(path)
|
||||||
|
self.lexicon = IndexDirectoryContext(
|
||||||
|
os.path.join(self.path, 'lexicon'),
|
||||||
|
LexiconConfigDirectoryContext)
|
||||||
|
self.user = IndexDirectoryContext(
|
||||||
|
os.path.join(self.path, 'user'),
|
||||||
|
UserConfigDirectoryContext)
|
||||||
|
|
||||||
|
|
||||||
|
class LexiconConfigDirectoryContext(ConfigDirectoryContext):
|
||||||
|
"""
|
||||||
|
A config context for a lexicon's config directory.
|
||||||
|
"""
|
||||||
|
def __init__(self, path):
|
||||||
|
super().__init__(path)
|
||||||
|
self.draft = ConfigDirectoryContext(os.path.join(self.path, 'draft'))
|
||||||
|
self.src = ConfigDirectoryContext(os.path.join(self.path, 'src'))
|
||||||
|
|
||||||
|
def config(edit=False):
|
||||||
|
if edit:
|
||||||
|
return self.edit('config')
|
||||||
|
else:
|
||||||
|
return self.read('config')
|
||||||
|
|
||||||
|
|
||||||
|
class UserConfigDirectoryContext(ConfigDirectoryContext):
|
||||||
|
"""
|
||||||
|
A config context for a user's config directory.
|
||||||
|
"""
|
||||||
|
def config(edit=False):
|
||||||
|
if edit:
|
||||||
|
return self.edit('config')
|
||||||
|
else:
|
||||||
|
return self.read('config')
|
|
@ -46,7 +46,9 @@ class ReadOnlyOrderedDict(OrderedDict):
|
||||||
raise AttributeError(key)
|
raise AttributeError(key)
|
||||||
return self[key]
|
return self[key]
|
||||||
|
|
||||||
|
|
||||||
class open_lock():
|
class open_lock():
|
||||||
|
"""A context manager that opens a file with the specified file lock"""
|
||||||
def __init__(self, path, mode, lock_type):
|
def __init__(self, path, mode, lock_type):
|
||||||
self.fd = open(path, mode, encoding='utf8')
|
self.fd = open(path, mode, encoding='utf8')
|
||||||
fcntl.lockf(self.fd, lock_type)
|
fcntl.lockf(self.fd, lock_type)
|
||||||
|
@ -58,15 +60,25 @@ class open_lock():
|
||||||
fcntl.lockf(self.fd, fcntl.LOCK_UN)
|
fcntl.lockf(self.fd, fcntl.LOCK_UN)
|
||||||
self.fd.close()
|
self.fd.close()
|
||||||
|
|
||||||
|
|
||||||
class open_sh(open_lock):
|
class open_sh(open_lock):
|
||||||
|
"""A context manager that opens a file with a shared lock"""
|
||||||
def __init__(self, path, mode):
|
def __init__(self, path, mode):
|
||||||
super().__init__(path, mode, fcntl.LOCK_SH)
|
super().__init__(path, mode, fcntl.LOCK_SH)
|
||||||
|
|
||||||
|
|
||||||
class open_ex(open_lock):
|
class open_ex(open_lock):
|
||||||
|
"""A context manager that opens a file with an exclusive lock"""
|
||||||
def __init__(self, path, mode):
|
def __init__(self, path, mode):
|
||||||
super().__init__(path, mode, fcntl.LOCK_EX)
|
super().__init__(path, mode, fcntl.LOCK_EX)
|
||||||
|
|
||||||
|
|
||||||
class json_ro(open_sh):
|
class json_ro(open_sh):
|
||||||
|
"""
|
||||||
|
A context manager that opens a file in a shared, read-only mode.
|
||||||
|
The contents of the file are read as JSON and returned as a read-
|
||||||
|
only OrderedDict.
|
||||||
|
"""
|
||||||
def __init__(self, path):
|
def __init__(self, path):
|
||||||
super().__init__(path, 'r')
|
super().__init__(path, 'r')
|
||||||
self.config = None
|
self.config = None
|
||||||
|
@ -75,16 +87,32 @@ class json_ro(open_sh):
|
||||||
self.config = json.load(self.fd, object_pairs_hook=ReadOnlyOrderedDict)
|
self.config = json.load(self.fd, object_pairs_hook=ReadOnlyOrderedDict)
|
||||||
return self.config
|
return self.config
|
||||||
|
|
||||||
|
|
||||||
class json_rw(open_ex):
|
class json_rw(open_ex):
|
||||||
def __init__(self, path):
|
"""
|
||||||
super().__init__(path, 'r+')
|
A context manager that opens a file with an exclusive lock. The
|
||||||
|
file mode defaults to r+, which requires that the file exist. The
|
||||||
|
file mode can be set to w+ to create a new file by setting the new
|
||||||
|
kwarg in the ctor. The contents of the file are read as JSON and
|
||||||
|
returned in an AttrOrderedDict. Any changes to the context dict
|
||||||
|
will be written out to the file when the context manager exits.
|
||||||
|
"""
|
||||||
|
def __init__(self, path, new=False):
|
||||||
|
mode = 'w+' if new else 'r+'
|
||||||
|
super().__init__(path, mode)
|
||||||
self.config = None
|
self.config = None
|
||||||
|
self.new = new
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
|
if not self.new:
|
||||||
self.config = json.load(self.fd, object_pairs_hook=AttrOrderedDict)
|
self.config = json.load(self.fd, object_pairs_hook=AttrOrderedDict)
|
||||||
|
else:
|
||||||
|
self.config = AttrOrderedDict()
|
||||||
return self.config
|
return self.config
|
||||||
|
|
||||||
def __exit__(self, exc_type, exc_value, traceback):
|
def __exit__(self, exc_type, exc_value, traceback):
|
||||||
|
# Only write the enw value out if there wasn't an exception
|
||||||
|
if not exc_type:
|
||||||
self.fd.seek(0)
|
self.fd.seek(0)
|
||||||
json.dump(self.config, self.fd, allow_nan=False, indent='\t')
|
json.dump(self.config, self.fd, allow_nan=False, indent='\t')
|
||||||
self.fd.truncate()
|
self.fd.truncate()
|
||||||
|
|
|
@ -3,6 +3,15 @@ class AmanuensisError(Exception):
|
||||||
|
|
||||||
class MissingConfigError(AmanuensisError):
|
class MissingConfigError(AmanuensisError):
|
||||||
"""A config file is missing that was expected to be present"""
|
"""A config file is missing that was expected to be present"""
|
||||||
|
def __init__(self, path):
|
||||||
|
super.__init__(self, "A config file or directory was expected to "
|
||||||
|
f"exist, but could not be found: {path}")
|
||||||
|
|
||||||
|
class ConfigAlreadyExistsError(AmanuensisError):
|
||||||
|
"""Attempted to create a config, but it already exists"""
|
||||||
|
def __init__(self, path):
|
||||||
|
super.__init__(self, "Attempted to create a config, but it already "
|
||||||
|
f"exists: {path}")
|
||||||
|
|
||||||
class MalformedConfigError(AmanuensisError):
|
class MalformedConfigError(AmanuensisError):
|
||||||
"""A config file could not be read and parsed"""
|
"""A config file could not be read and parsed"""
|
||||||
|
|
Loading…
Reference in New Issue