From 0c4177b783d04cc756842a16c10e93ccb68109ee Mon Sep 17 00:00:00 2001 From: Tim Van Baak Date: Sat, 21 Sep 2024 14:55:10 -0700 Subject: [PATCH] Add on_create trigger --- intake/source.py | 14 +++++++++++++- tests/channels.json | 3 ++- tests/conftest.py | 1 + tests/demo_oncreate/intake.json | 22 ++++++++++++++++++++++ tests/demo_oncreate/update.py | 29 +++++++++++++++++++++++++++++ tests/test_source.py | 1 + 6 files changed, 68 insertions(+), 2 deletions(-) create mode 100644 tests/demo_oncreate/intake.json create mode 100755 tests/demo_oncreate/update.py diff --git a/intake/source.py b/intake/source.py index e544919..69ff033 100644 --- a/intake/source.py +++ b/intake/source.py @@ -330,6 +330,8 @@ def update_items(source: LocalSource, fetched_items: List[Item]): Update the source with a batch of new items, doing creations, updates, and deletions as necessary. """ + config = source.get_config() + # Get a list of item ids that already existed for this source. prior_ids = source.get_item_ids() print(f"Found {len(prior_ids)} prior items", file=sys.stderr) @@ -345,9 +347,19 @@ def update_items(source: LocalSource, fetched_items: List[Item]): # Write all the new items to the source directory. for item in new_items: - # TODO: support on-create trigger source.save_item(item) + # If the source has an on-create trigger, run it for new items + if "on_create" in config.get("action", {}): + for item in new_items: + try: + execute_action(source, item["id"], "on_create") + except Exception as ex: + print( + f"on_create failed for {source.source_name}/{item['id']}:\n{ex}", + file=sys.stderr, + ) + # Update the other items using the fetched items' values. for upd_item in upd_items: old_item = source.get_item(upd_item["id"]) diff --git a/tests/channels.json b/tests/channels.json index 0d2c6bc..6da5797 100644 --- a/tests/channels.json +++ b/tests/channels.json @@ -2,6 +2,7 @@ "demo": [ "demo_basic_callback", "demo_logging", - "demo_raw_sh" + "demo_raw_sh", + "demo_oncreate" ] } \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py index 1f7412e..0303692 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -23,6 +23,7 @@ def using_source() -> Callable: clean_source(source_path) sources.append(source_path) return LocalSource(test_data, name) + yield _using_source for source_path in sources: diff --git a/tests/demo_oncreate/intake.json b/tests/demo_oncreate/intake.json new file mode 100644 index 0000000..6bfbaa1 --- /dev/null +++ b/tests/demo_oncreate/intake.json @@ -0,0 +1,22 @@ +{ + "action": { + "fetch": { + "exe": "./update.py", + "args": [ + "fetch" + ] + }, + "update": { + "exe": "./update.py", + "args": [ + "update" + ] + }, + "on_create": { + "exe": "./update.py", + "args": [ + "update" + ] + } + } +} \ No newline at end of file diff --git a/tests/demo_oncreate/update.py b/tests/demo_oncreate/update.py new file mode 100755 index 0000000..73c9a40 --- /dev/null +++ b/tests/demo_oncreate/update.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python3 + +import argparse, json, sys, time + +parser = argparse.ArgumentParser() +parser.add_argument("action") +args = parser.parse_args() + +print("args:", args, file=sys.stderr, flush=True) + +if args.action == "fetch": + print( + json.dumps( + { + "id": str(int(time.time())), + "title": "Title has not been updated", + "action": { + "update": 0, + }, + } + ) + ) + +if args.action == "update" or args.action == "on_create": + item = sys.stdin.readline() + item = json.loads(item) + item["action"]["update"] += 1 + item["title"] = f"Updated {item['action']['update']} times" + print(json.dumps(item)) diff --git a/tests/test_source.py b/tests/test_source.py index ba439e8..2fe215d 100644 --- a/tests/test_source.py +++ b/tests/test_source.py @@ -8,6 +8,7 @@ def test_default_source(using_source): fetch = fetch_items(source) assert len(fetch) == 0 + def test_basic_lifecycle(using_source): source: LocalSource = using_source("test_inbox") state = {"inbox": [{"id": "first"}]}