From f153263bc4f08afb640aff6fb31799992c96d9e6 Mon Sep 17 00:00:00 2001 From: Tim Van Baak Date: Fri, 24 Jan 2025 15:41:52 -0800 Subject: [PATCH] Add on_create triggers --- README.md | 2 +- cmd/action.go | 9 +++- core/source.go | 28 ++++++++++ core/source_test.go | 129 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 166 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 3687591..18a4e9b 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ Parity with existing Python version * [x] create and delete items * [x] update existing items * [ ] support item TTL and TTD - * [ ] on_create triggers + * [x] on_create triggers * [ ] on_delete triggers * [x] dry-run * item actions diff --git a/cmd/action.go b/cmd/action.go index 27b31a0..e587e6a 100644 --- a/cmd/action.go +++ b/cmd/action.go @@ -17,7 +17,14 @@ modifications made by the action, on stdout. Items declare support for an action by having an "action" key containing an object with a key for every supported action. The value of that key may be any arbitrary JSON value. Use --force to execute an unsupported action anyway, -though the action may fail if it operated on the item's action data. +though the action may fail if it operates on the item's action data. + +The special action "on_create" is always run when an item is first returned +by a fetch. The item does not need to declare support for "on_create". This +action is not accessible through the web interface, so if you need to retry +the action, you need another action with the same command as "on_create". +If an item's "on_create" fails, the item is still created, but without any +changes from the "on_create", if any. To execute the "fetch" action, use "intake source fetch".`, } diff --git a/core/source.go b/core/source.go index 96c035b..95e5a22 100644 --- a/core/source.go +++ b/core/source.go @@ -2,8 +2,11 @@ package core import ( "database/sql" + "encoding/json" "errors" "fmt" + "log" + "time" _ "github.com/mattn/go-sqlite3" ) @@ -277,6 +280,31 @@ func UpdateWithFetchedItems(db *DB, source string, items []Item) (int, int, erro return 0, 0, err } + // If the source has an on-create trigger, run it for each new item + // On-create errors are ignored to avoid failing the fetch + onCreateArgv, err := GetArgvForAction(db, source, "on_create") + if err == nil { + var updatedNewItems []Item + for _, item := range newItems { + itemJson, err := json.Marshal(item) + if err != nil { + log.Fatalf("error: failed to serialize item: %v", err) + } + res, err := Execute(source, onCreateArgv, nil, string(itemJson), time.Minute) + if err != nil { + log.Printf("error: failed to execute on_create for %s/%s: %v", item.Source, item.Id, err) + continue + } + if len(res) != 1 { + log.Printf("error: expected on_create for %s/%s to produce exactly one item, got %d", item.Source, item.Id, len(res)) + } + updatedItem := res[0] + BackfillItem(&updatedItem, &item) + updatedNewItems = append(updatedNewItems, updatedItem) + } + UpdateItems(db, updatedNewItems) + } + // Get the list of expired items fetchedIds := map[string]bool{} for _, item := range items { diff --git a/core/source_test.go b/core/source_test.go index 07bc8d0..5f04deb 100644 --- a/core/source_test.go +++ b/core/source_test.go @@ -4,6 +4,7 @@ import ( "fmt" "slices" "testing" + "time" _ "github.com/mattn/go-sqlite3" ) @@ -160,3 +161,131 @@ func TestUpdateSourceAddAndDelete(t *testing.T) { t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) } } + +func TestOnCreateAction(t *testing.T) { + db := EphemeralDb(t) + if err := AddSource(db, "test"); err != nil { + t.Fatal(err) + } + if err := AddAction(db, "test", "on_create", []string{"true"}); err != nil { + t.Fatal(err) + } + + execute := func(argv []string) []Item { + items, err := Execute("test", argv, nil, "", time.Minute) + if err != nil { + t.Fatal("unexpected error executing test fetch") + } + if len(items) != 1 { + t.Fatalf("expected only one item, got %d", len(items)) + } + return items + } + + onCreate := func(argv []string) { + if err := UpdateAction(db, "test", "on_create", argv); err != nil { + t.Fatal(err) + } + } + + getItem := func(id string) Item { + item, err := GetItem(db, "test", id) + if err != nil { + t.Fatal(err) + } + return item + } + + // Noop on_create works + onCreate([]string{"tee"}) + items := execute([]string{"jq", "-cn", `{id: "one"}`}) + add, _, err := UpdateWithFetchedItems(db, "test", items) + if add != 1 || err != nil { + t.Fatal("failed update with noop oncreate") + } + updated := getItem("one") + updated.Created = 0 // zero out for comparison with pre-insert item + if updated != items[0] { + t.Fatalf("expected no change: %#v != %#v", updated, items[0]) + } + + // on_create can change a field + onCreate([]string{"jq", "-c", `.title = "Goodbye, World"`}) + items = execute([]string{"jq", "-cn", `{id: "two", title: "Hello, World"}`}) + if items[0].Title != "Hello, World" { + t.Fatal("unexpected title") + } + add, _, err = UpdateWithFetchedItems(db, "test", items) + if add != 1 || err != nil { + t.Fatal("failed update with alter oncreate") + } + if getItem("two").Title != "Goodbye, World" { + t.Fatal("title not updated") + } + + // on_create can add a field + onCreate([]string{"jq", "-c", `.link = "gopher://go.dev"`}) + items = execute([]string{"jq", "-cn", `{id: "three"}`}) + if items[0].Link != "" { + t.Fatal("unexpected link") + } + add, _, err = UpdateWithFetchedItems(db, "test", items) + if add != 1 || err != nil { + t.Fatal("failed update with augment oncreate") + } + if getItem("three").Link != "gopher://go.dev" { + t.Fatal("link not added") + } + + // on_create can't delete a field using a zero value + // due to zero values preserving prior field values + onCreate([]string{"jq", "-c", `del(.link)`}) + items = execute([]string{"jq", "-cn", `{id: "four", link: "gopher://go.dev"}`}) + if items[0].Link != "gopher://go.dev" { + t.Fatal("missing link") + } + add, _, err = UpdateWithFetchedItems(db, "test", items) + if add != 1 || err != nil { + t.Fatal("failed update with attempted deletion oncreate") + } + if getItem("four").Link != "gopher://go.dev" { + t.Fatal("link unexpectedly removed") + } + + // item is created if on_create fails + onCreate([]string{"false"}) + items = execute([]string{"jq", "-cn", `{id: "five"}`}) + add, _, err = UpdateWithFetchedItems(db, "test", items) + if add != 1 || err != nil { + t.Fatal("failed update with failing oncreate") + } + if getItem("five").Id != "five" { + t.Fatal("item not created") + } + + // item isn't updated if on_create has valid output but a bad exit code + onCreate([]string{"sh", "-c", `jq -cn '{id: "six", title: "after"}'; exit 1`}) + items = execute([]string{"jq", "-cn", `{id: "six", title: "before"}`}) + if items[0].Title != "before" { + t.Fatal("unexpected title") + } + add, _, err = UpdateWithFetchedItems(db, "test", items) + if add != 1 || err != nil { + t.Fatal("failed update with bad exit code oncreate") + } + if getItem("six").Title != "before" { + t.Fatal("update applied after oncreate failed") + } + + // on_create can't change id, active, or created + onCreate([]string{"jq", "-c", `.id = "seven"; .active = false; .created = 123456`}) + items = execute([]string{"jq", "-cn", `{id: "seven"}`}) + add, _, err = UpdateWithFetchedItems(db, "test", items) + if add != 1 || err != nil { + t.Fatal("failed update with invalid field changes oncreate") + } + updated = getItem("seven") + if updated.Id != "seven" || !updated.Active || updated.Created == 123456 { + t.Fatal("unexpected changes to id, active, or created fields") + } +}