Add on_create triggers
This commit is contained in:
parent
79dbea50c2
commit
f153263bc4
@ -14,7 +14,7 @@ Parity with existing Python version
|
||||
* [x] create and delete items
|
||||
* [x] update existing items
|
||||
* [ ] support item TTL and TTD
|
||||
* [ ] on_create triggers
|
||||
* [x] on_create triggers
|
||||
* [ ] on_delete triggers
|
||||
* [x] dry-run
|
||||
* item actions
|
||||
|
@ -17,7 +17,14 @@ modifications made by the action, on stdout.
|
||||
Items declare support for an action by having an "action" key containing an
|
||||
object with a key for every supported action. The value of that key may be
|
||||
any arbitrary JSON value. Use --force to execute an unsupported action anyway,
|
||||
though the action may fail if it operated on the item's action data.
|
||||
though the action may fail if it operates on the item's action data.
|
||||
|
||||
The special action "on_create" is always run when an item is first returned
|
||||
by a fetch. The item does not need to declare support for "on_create". This
|
||||
action is not accessible through the web interface, so if you need to retry
|
||||
the action, you need another action with the same command as "on_create".
|
||||
If an item's "on_create" fails, the item is still created, but without any
|
||||
changes from the "on_create", if any.
|
||||
|
||||
To execute the "fetch" action, use "intake source fetch".`,
|
||||
}
|
||||
|
@ -2,8 +2,11 @@ package core
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
@ -277,6 +280,31 @@ func UpdateWithFetchedItems(db *DB, source string, items []Item) (int, int, erro
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
// If the source has an on-create trigger, run it for each new item
|
||||
// On-create errors are ignored to avoid failing the fetch
|
||||
onCreateArgv, err := GetArgvForAction(db, source, "on_create")
|
||||
if err == nil {
|
||||
var updatedNewItems []Item
|
||||
for _, item := range newItems {
|
||||
itemJson, err := json.Marshal(item)
|
||||
if err != nil {
|
||||
log.Fatalf("error: failed to serialize item: %v", err)
|
||||
}
|
||||
res, err := Execute(source, onCreateArgv, nil, string(itemJson), time.Minute)
|
||||
if err != nil {
|
||||
log.Printf("error: failed to execute on_create for %s/%s: %v", item.Source, item.Id, err)
|
||||
continue
|
||||
}
|
||||
if len(res) != 1 {
|
||||
log.Printf("error: expected on_create for %s/%s to produce exactly one item, got %d", item.Source, item.Id, len(res))
|
||||
}
|
||||
updatedItem := res[0]
|
||||
BackfillItem(&updatedItem, &item)
|
||||
updatedNewItems = append(updatedNewItems, updatedItem)
|
||||
}
|
||||
UpdateItems(db, updatedNewItems)
|
||||
}
|
||||
|
||||
// Get the list of expired items
|
||||
fetchedIds := map[string]bool{}
|
||||
for _, item := range items {
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"fmt"
|
||||
"slices"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
@ -160,3 +161,131 @@ func TestUpdateSourceAddAndDelete(t *testing.T) {
|
||||
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOnCreateAction(t *testing.T) {
|
||||
db := EphemeralDb(t)
|
||||
if err := AddSource(db, "test"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := AddAction(db, "test", "on_create", []string{"true"}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
execute := func(argv []string) []Item {
|
||||
items, err := Execute("test", argv, nil, "", time.Minute)
|
||||
if err != nil {
|
||||
t.Fatal("unexpected error executing test fetch")
|
||||
}
|
||||
if len(items) != 1 {
|
||||
t.Fatalf("expected only one item, got %d", len(items))
|
||||
}
|
||||
return items
|
||||
}
|
||||
|
||||
onCreate := func(argv []string) {
|
||||
if err := UpdateAction(db, "test", "on_create", argv); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
getItem := func(id string) Item {
|
||||
item, err := GetItem(db, "test", id)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return item
|
||||
}
|
||||
|
||||
// Noop on_create works
|
||||
onCreate([]string{"tee"})
|
||||
items := execute([]string{"jq", "-cn", `{id: "one"}`})
|
||||
add, _, err := UpdateWithFetchedItems(db, "test", items)
|
||||
if add != 1 || err != nil {
|
||||
t.Fatal("failed update with noop oncreate")
|
||||
}
|
||||
updated := getItem("one")
|
||||
updated.Created = 0 // zero out for comparison with pre-insert item
|
||||
if updated != items[0] {
|
||||
t.Fatalf("expected no change: %#v != %#v", updated, items[0])
|
||||
}
|
||||
|
||||
// on_create can change a field
|
||||
onCreate([]string{"jq", "-c", `.title = "Goodbye, World"`})
|
||||
items = execute([]string{"jq", "-cn", `{id: "two", title: "Hello, World"}`})
|
||||
if items[0].Title != "Hello, World" {
|
||||
t.Fatal("unexpected title")
|
||||
}
|
||||
add, _, err = UpdateWithFetchedItems(db, "test", items)
|
||||
if add != 1 || err != nil {
|
||||
t.Fatal("failed update with alter oncreate")
|
||||
}
|
||||
if getItem("two").Title != "Goodbye, World" {
|
||||
t.Fatal("title not updated")
|
||||
}
|
||||
|
||||
// on_create can add a field
|
||||
onCreate([]string{"jq", "-c", `.link = "gopher://go.dev"`})
|
||||
items = execute([]string{"jq", "-cn", `{id: "three"}`})
|
||||
if items[0].Link != "" {
|
||||
t.Fatal("unexpected link")
|
||||
}
|
||||
add, _, err = UpdateWithFetchedItems(db, "test", items)
|
||||
if add != 1 || err != nil {
|
||||
t.Fatal("failed update with augment oncreate")
|
||||
}
|
||||
if getItem("three").Link != "gopher://go.dev" {
|
||||
t.Fatal("link not added")
|
||||
}
|
||||
|
||||
// on_create can't delete a field using a zero value
|
||||
// due to zero values preserving prior field values
|
||||
onCreate([]string{"jq", "-c", `del(.link)`})
|
||||
items = execute([]string{"jq", "-cn", `{id: "four", link: "gopher://go.dev"}`})
|
||||
if items[0].Link != "gopher://go.dev" {
|
||||
t.Fatal("missing link")
|
||||
}
|
||||
add, _, err = UpdateWithFetchedItems(db, "test", items)
|
||||
if add != 1 || err != nil {
|
||||
t.Fatal("failed update with attempted deletion oncreate")
|
||||
}
|
||||
if getItem("four").Link != "gopher://go.dev" {
|
||||
t.Fatal("link unexpectedly removed")
|
||||
}
|
||||
|
||||
// item is created if on_create fails
|
||||
onCreate([]string{"false"})
|
||||
items = execute([]string{"jq", "-cn", `{id: "five"}`})
|
||||
add, _, err = UpdateWithFetchedItems(db, "test", items)
|
||||
if add != 1 || err != nil {
|
||||
t.Fatal("failed update with failing oncreate")
|
||||
}
|
||||
if getItem("five").Id != "five" {
|
||||
t.Fatal("item not created")
|
||||
}
|
||||
|
||||
// item isn't updated if on_create has valid output but a bad exit code
|
||||
onCreate([]string{"sh", "-c", `jq -cn '{id: "six", title: "after"}'; exit 1`})
|
||||
items = execute([]string{"jq", "-cn", `{id: "six", title: "before"}`})
|
||||
if items[0].Title != "before" {
|
||||
t.Fatal("unexpected title")
|
||||
}
|
||||
add, _, err = UpdateWithFetchedItems(db, "test", items)
|
||||
if add != 1 || err != nil {
|
||||
t.Fatal("failed update with bad exit code oncreate")
|
||||
}
|
||||
if getItem("six").Title != "before" {
|
||||
t.Fatal("update applied after oncreate failed")
|
||||
}
|
||||
|
||||
// on_create can't change id, active, or created
|
||||
onCreate([]string{"jq", "-c", `.id = "seven"; .active = false; .created = 123456`})
|
||||
items = execute([]string{"jq", "-cn", `{id: "seven"}`})
|
||||
add, _, err = UpdateWithFetchedItems(db, "test", items)
|
||||
if add != 1 || err != nil {
|
||||
t.Fatal("failed update with invalid field changes oncreate")
|
||||
}
|
||||
updated = getItem("seven")
|
||||
if updated.Id != "seven" || !updated.Active || updated.Created == 123456 {
|
||||
t.Fatal("unexpected changes to id, active, or created fields")
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user