import json from pathlib import Path import tempfile from intake.source import fetch_items, update_items, LocalSource def test_default_source(using_source): source: LocalSource = using_source("default") fetch = fetch_items(source) assert len(fetch) == 0 def test_basic_lifecycle(using_source): source: LocalSource = using_source("test_inbox") state = {"inbox": [{"id": "first"}]} source.get_state_path().write_text(json.dumps(state)) # The inboxed item is returned from fetch fetch = fetch_items(source) assert len(fetch) == 1 assert fetch[0]["id"] == "first" # Update creates the item in the source update_items(source, fetch) assert source.get_item_path("first").exists() assert source.get_item("first").get("active") == True items = list(source.get_all_items()) assert len(items) == 1 assert items[0]["id"] == "first" # A second fetch does not change anything fetch = fetch_items(source) update_items(source, fetch) assert source.get_item_path("first").exists() assert source.get_item("first").get("active") == True items = list(source.get_all_items()) assert len(items) == 1 assert items[0]["id"] == "first" # The item remains after it is no longer in the feed state = {"inbox": [{"id": "second"}]} source.get_state_path().write_text(json.dumps(state)) fetch = fetch_items(source) update_items(source, fetch) assert source.get_item_path("first").exists() assert source.get_item("first").get("active") == True assert source.get_item_path("second").exists() assert source.get_item("second").get("active") == True items = list(source.get_all_items()) assert len(items) == 2 assert sorted(map(lambda i: i["id"], items)) == ["first", "second"] # The item is removed on the next update when it is inactive first = source.get_item("first") first["active"] = False source.save_item(first) fetch = fetch_items(source) update_items(source, fetch) assert not source.get_item_path("first").exists() assert source.get_item_path("second").exists() items = list(source.get_all_items()) assert len(items) == 1 assert items[0]["id"] == "second" def test_batch(): with tempfile.TemporaryDirectory() as data_dir: root = Path(data_dir) source_dir = root / "batching" source_dir.mkdir() config_file = source_dir / "intake.json" sh_args = [ "python", "-c", "import random; print(f'{{\"id\":\"{random.randrange(16**16):016x}\"}}')" ] batch_config = { "action": { "fetch": { "args": sh_args } }, "batch": 0 } config_file.write_text(json.dumps(batch_config)) source = LocalSource(root, source_dir.name) # batch sets the tts fetch1 = fetch_items(source) assert len(fetch1) == 1 update_items(source, fetch1) item1 = source.get_item(fetch1[0]["id"]) assert "tts" in item1._item batch_config["batch"] = 3600 config_file.write_text(json.dumps(batch_config)) fetch2 = fetch_items(source) assert len(fetch2) == 1 update_items(source, fetch2) item2 = source.get_item(fetch2[0]["id"]) assert "tts" in item2._item assert item1["id"] != item2["id"] assert item2.tts_at == item1.tts_at + 3600