Add some basic unit tests

This commit is contained in:
Tim Van Baak 2023-06-04 15:31:08 -07:00
parent 9407b53c7a
commit 5e89ba35cd
7 changed files with 141 additions and 12 deletions

29
tests/conftest.py Normal file
View File

@ -0,0 +1,29 @@
from pathlib import Path
from typing import List, Callable
import pytest
from intake.source import LocalSource
def clean_source(source_path: Path):
for item in source_path.iterdir():
if item.name.endswith(".item"):
item.unlink()
(source_path / "state").unlink(missing_ok=True)
@pytest.fixture
def using_source() -> Callable:
test_data = Path(__file__).parent
sources: List[Path] = []
def _using_source(name: str):
source_path = test_data / name
clean_source(source_path)
sources.append(source_path)
return LocalSource(test_data, name)
yield _using_source
for source_path in sources:
clean_source(source_path)

View File

@ -0,0 +1,7 @@
{
"action": {
"fetch": {
"exe": "true"
}
}
}

View File

@ -9,14 +9,18 @@ args = parser.parse_args()
print("args:", args, file=sys.stderr, flush=True)
if args.action == "fetch":
print(json.dumps({
"id": "updateme",
"title": "The count is at 1",
"action": {
"increment": 1,
"decrement": "",
}
}))
print(
json.dumps(
{
"id": "updateme",
"title": "The count is at 1",
"action": {
"increment": 1,
"decrement": "",
},
}
)
)
if args.action == "increment":
item = sys.stdin.readline()

View File

@ -6,10 +6,7 @@ import sys
import time
greeting = os.environ.get("HELLO", "MISSING")
item = json.dumps({
"id": "helloworld",
"title": "Hello = " + greeting
})
item = json.dumps({"id": "helloworld", "title": "Hello = " + greeting})
sys.stdout.write(item[:10])
sys.stdout.flush()

View File

@ -0,0 +1,8 @@
{
"action": {
"fetch": {
"exe": "./update.py",
"args": ["fetch"]
}
}
}

21
tests/test_inbox/update.py Executable file
View File

@ -0,0 +1,21 @@
#!/usr/bin/env python3
"""
A test source that "generates" items by returning them from the state file.
This source exists for unit testing so that unit tests can easily manipulate
what items are returned by writing them to the state file directly.
"""
import argparse, json, os, sys
parser = argparse.ArgumentParser()
parser.add_argument("action")
args = parser.parse_args()
print("args:", args, file=sys.stderr, flush=True)
if args.action == "fetch":
state_path = os.environ.get("STATE_PATH")
with open(state_path) as f:
state = json.load(f)
for item in state["inbox"]:
print(json.dumps(item))

63
tests/test_source.py Normal file
View File

@ -0,0 +1,63 @@
import json
from intake.source import fetch_items, update_items, LocalSource
def test_default_source(using_source):
source: LocalSource = using_source("default")
fetch = fetch_items(source)
assert len(fetch) == 0
def test_basic_lifecycle(using_source):
source: LocalSource = using_source("test_inbox")
state = {"inbox": [{"id": "first"}]}
source.get_state_path().write_text(json.dumps(state))
# The inboxed item is returned from fetch
fetch = fetch_items(source)
assert len(fetch) == 1
assert fetch[0]["id"] == "first"
# Update creates the item in the source
update_items(source, fetch)
assert source.get_item_path("first").exists()
assert source.get_item("first").get("active") == True
items = list(source.get_all_items())
assert len(items) == 1
assert items[0]["id"] == "first"
# A second fetch does not change anything
fetch = fetch_items(source)
update_items(source, fetch)
assert source.get_item_path("first").exists()
assert source.get_item("first").get("active") == True
items = list(source.get_all_items())
assert len(items) == 1
assert items[0]["id"] == "first"
# The item remains after it is no longer in the feed
state = {"inbox": [{"id": "second"}]}
source.get_state_path().write_text(json.dumps(state))
fetch = fetch_items(source)
update_items(source, fetch)
assert source.get_item_path("first").exists()
assert source.get_item("first").get("active") == True
assert source.get_item_path("second").exists()
assert source.get_item("second").get("active") == True
items = list(source.get_all_items())
assert len(items) == 2
assert sorted(map(lambda i: i["id"], items)) == ["first", "second"]
# The item is removed on the next update when it is inactive
first = source.get_item("first")
first["active"] = False
source.save_item(first)
fetch = fetch_items(source)
update_items(source, fetch)
assert not source.get_item_path("first").exists()
assert source.get_item_path("second").exists()
items = list(source.get_all_items())
assert len(items) == 1
assert items[0]["id"] == "second"