Add test command to verify provider attributes

This commit is contained in:
Tim Van Baak 2022-08-12 21:14:08 -07:00
parent 59fdc5b355
commit 1379e5f8a3
3 changed files with 158 additions and 7 deletions

View File

@ -1 +1 @@
from .provider import Config, Setting
from .provider import Setting, BaseSettings

View File

@ -1,8 +1,99 @@
from argparse import ArgumentParser, RawDescriptionHelpFormatter, REMAINDER
import argparse
import inspect
import os
from signal import signal, SIGPIPE, SIG_DFL
import sys
from intake.provider import BaseSettings, load_provider
def command_test(args):
"""Check for errors or misconfigurations."""
parser = argparse.ArgumentParser(
prog="intake test",
description=command_test.__doc__)
parser.add_argument("--provider",
nargs="+",
help="Providers to test.",
metavar="name",
default=[])
parser.add_argument("--path",
nargs="+",
help="Additional paths to add to INTAKEPATH",
metavar="path",
type=os.path.abspath,
default=[])
args = parser.parse_args(args)
search_path = args.path
if args.provider:
print("INTAKEPATH:")
for path in search_path:
print(f" {path}")
for provider_name in args.provider:
print(f"Checking provider {provider_name}")
provider = load_provider(search_path, provider_name)
if not provider:
print(" x Not found")
continue
# Settings class
if not hasattr(provider, "Settings"):
print(" x Missing Settings class")
else:
settings = getattr(provider, "Settings")
if not issubclass(settings, BaseSettings):
print(" x Settings class does not inherit from intake.BaseSettings")
else:
print(" o Settings")
# update function
if not hasattr(provider, "Settings"):
print(" x Missing update(config, state)")
else:
update = getattr(provider, "update")
if not callable(update):
print(" x update is not callable")
else:
update_sig = inspect.signature(update)
if list(update_sig.parameters) != ["config", "state"]:
print(" x update does not have signature (config, state)")
else:
print(" o update")
# on-create hook
if hasattr(provider, "on_create"):
on_create = getattr(provider, "on_create")
if not callable(on_create):
print(" x on_create is not callable")
else:
create_sig = inspect.signature(on_create)
if list(create_sig.parameters) != ["config", "state", "item"]:
print(" x on_create does not have signature (config, state, item)")
else:
print(" o on_create")
# on-delete hook
if hasattr(provider, "on_delete"):
on_delete = getattr(provider, "on_delete")
if not callable(on_delete):
print(" x on_delete is not callable")
else:
delete_sig = inspect.signature(on_delete)
if list(delete_sig.parameters) != ["config", "state", "item"]:
print(" x on_delete does not have signature (config, state, item)")
else:
print(" o on_delete")
# actions
actions = [name for name in vars(provider) if name.startswith("action_")]
for action_name in actions:
action = getattr(provider, action_name)
if not callable(action):
print(f" x {action_name} is not callable")
else:
action_sig = inspect.signature(action)
if list(action_sig.parameters) != ["config", "state", "item"]:
print(f" x {action_name} does not have signature (config, state, item)")
else:
print(f" o {action_name}")
print("Done")
def command_help(args):
"""Print this help message and exit."""
@ -27,10 +118,9 @@ def main():
for name, func in commands.items()])
# Set up the top-level parser
parser = ArgumentParser(
parser = argparse.ArgumentParser(
description=f"Available commands:\n{descriptions}\n",
formatter_class=RawDescriptionHelpFormatter,
add_help=False)
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("command",
nargs="?",
default="help",

View File

@ -1,3 +1,8 @@
import importlib.util
import os
import sys
class SettingMissingError(Exception):
"""
No value was provided for a required setting.
@ -17,9 +22,9 @@ class Setting:
self.value = default
class Config:
class BaseSettings:
"""
Base class for provider configs.
Base class for provider settings.
"""
name = Setting(required=True)
@ -37,3 +42,59 @@ class Config:
missing.append(setting_name)
if missing:
raise SettingMissingError(missing)
class chdir:
"""
A context manager that changes the working directory inside the context.
"""
def __init__(self, path):
self.cwd = os.getcwd()
os.chdir(path)
def __enter__(self):
pass
def __exit__(self, *args):
os.chdir(self.cwd)
class add_to_sys_path:
"""
A context manager that adds a path to sys.path, allowing imports from it.
"""
def __init__(self, path):
self.path = path
self.not_present = path not in sys.path
if self.not_present:
sys.path.insert(0, path)
def __enter__(self):
pass
def __exit__(self, *args):
if self.not_present:
sys.path.remove(self.path)
def load_provider(search_paths, provider_name):
"""
Load a provider on the search path. If the provider cannot be found,
return None.
"""
for search_path in search_paths:
with chdir(search_path), add_to_sys_path(search_path):
# Check if the provider is on this path.
provider_filename = f"{provider_name}.py"
if not os.path.isfile(provider_filename):
continue
# Import the provider by file path.
spec = importlib.util.spec_from_file_location(provider_name, provider_filename)
provider_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(provider_module)
provider = importlib.import_module(provider_name)
return provider
return None