From c9949b7b252e284129b4d3f5739c0eaa7b1defed Mon Sep 17 00:00:00 2001 From: Tim Van Baak Date: Fri, 31 Jan 2025 15:00:05 -0800 Subject: [PATCH 1/2] Move item functions to items.go --- core/{source.go => items.go} | 0 core/{source_test.go => items_test.go} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename core/{source.go => items.go} (100%) rename core/{source_test.go => items_test.go} (100%) diff --git a/core/source.go b/core/items.go similarity index 100% rename from core/source.go rename to core/items.go diff --git a/core/source_test.go b/core/items_test.go similarity index 100% rename from core/source_test.go rename to core/items_test.go From 0337cc8ee303bd2fe83b68fe599a5c1e1645955a Mon Sep 17 00:00:00 2001 From: Tim Van Baak Date: Fri, 31 Jan 2025 15:02:45 -0800 Subject: [PATCH 2/2] Reduce items.go to item functions --- core/items.go | 163 -------------------------- core/items_test.go | 282 --------------------------------------------- 2 files changed, 445 deletions(-) diff --git a/core/items.go b/core/items.go index 3eb0c5f..5e695dd 100644 --- a/core/items.go +++ b/core/items.go @@ -5,56 +5,10 @@ import ( "encoding/json" "errors" "fmt" - "log" - "time" _ "github.com/mattn/go-sqlite3" ) -func AddSource(db DB, name string) error { - _, err := db.Exec(` - insert into sources (name) - values (?) - `, name) - - return err -} - -func GetSources(db DB) ([]string, error) { - rows, err := db.Query(` - select name - from sources - `) - if err != nil { - return nil, err - } - var names []string - for rows.Next() { - var name string - if err = rows.Scan(&name); err != nil { - return nil, err - } - names = append(names, name) - } - return names, nil -} - -func SourceExists(db DB, source string) (bool, error) { - row := db.QueryRow("select count(*) from sources where name = ?", source) - var c int - err := row.Scan(&c) - return c > 0, err -} - -func DeleteSource(db DB, name string) error { - _, err := db.Exec(` - delete from sources - where name = ? - `, name) - - return err -} - func AddItems(db DB, items []Item) error { return db.Transact(func(tx DB) error { stmt, err := tx.Prepare(` @@ -244,120 +198,3 @@ func GetAllItemsForSource(db DB, source string) ([]Item, error) { order by case when time = 0 then created else time end, id `, source) } - -func GetState(db DB, source string) ([]byte, error) { - row := db.QueryRow("select state from sources where name = ?", source) - var state []byte - err := row.Scan(&state) - return state, err -} - -func SetState(db DB, source string, state []byte) error { - _, err := db.Exec("update sources set state = ? where name = ?", state, source) - return err -} - -// Given the results of a fetch, add new items, update existing items, and delete expired items. -// -// Returns the number of new and deleted items on success. -func UpdateWithFetchedItems(db DB, source string, state []byte, items []Item) (int, int, error) { - var new int - var del int - var err error - err = db.Transact(func(tx DB) error { - new, del, err = updateWithFetchedItemsTx(tx, source, state, items) - return err - }) - return new, del, err -} - -// Implementation logic for [UpdateWithFetchedItems], which executes this inside a transaction. -func updateWithFetchedItemsTx(db DB, source string, state []byte, items []Item) (int, int, error) { - // Get the existing items - existingItems, err := GetAllItemsForSource(db, source) - if err != nil { - return 0, 0, err - } - existingIds := map[string]bool{} - existingItemsById := map[string]*Item{} - for _, item := range existingItems { - existingIds[item.Id] = true - existingItemsById[item.Id] = &item - } - - // Split the fetch into adds and updates - var newItems []Item - var updatedItems []Item - for _, item := range items { - if existingIds[item.Id] { - updatedItems = append(updatedItems, item) - } else { - newItems = append(newItems, item) - } - } - - // Bulk insert the new items - if err = AddItems(db, newItems); err != nil { - return 0, 0, err - } - - // Bulk update the existing items - for _, item := range updatedItems { - BackfillItem(&item, existingItemsById[item.Id]) - } - if err = UpdateItems(db, updatedItems); err != nil { - return 0, 0, err - } - - envs, err := GetEnvs(db, source) - if err != nil { - return 0, 0, fmt.Errorf("failed to get envs for %s: %v", source, err) - } - - // If the source has an on-create trigger, run it for each new item - // On-create errors are ignored to avoid failing the fetch - onCreateArgv, err := GetArgvForAction(db, source, "on_create") - if err == nil && len(onCreateArgv) > 0 { - var updatedNewItems []Item - for _, item := range newItems { - var updatedItem Item - updatedItem, state, err = ExecuteItemAction(item, onCreateArgv, envs, state, time.Minute) - if err != nil { - log.Printf("error: on_create failed for %s/%s: %v", item.Source, item.Id, err) - } - updatedNewItems = append(updatedNewItems, updatedItem) - } - UpdateItems(db, updatedNewItems) - } - - // Get the list of expired items - fetchedIds := map[string]bool{} - for _, item := range items { - fetchedIds[item.Id] = true - } - expiredIds := map[string]bool{} - for id := range existingIds { - expiredIds[id] = !fetchedIds[id] - } - - // Check expired items for deletion - idsToDelete := map[string]bool{} - for _, item := range existingItems { - if expiredIds[item.Id] && item.Deletable() { - idsToDelete[item.Id] = true - } - } - - // Delete each item to be deleted - for id := range idsToDelete { - if _, err = DeleteItem(db, source, id); err != nil { - return 0, 0, err - } - } - - if err = SetState(db, source, state); err != nil { - return 0, 0, err - } - - return len(newItems), len(idsToDelete), nil -} diff --git a/core/items_test.go b/core/items_test.go index 520cb77..1ca07bc 100644 --- a/core/items_test.go +++ b/core/items_test.go @@ -1,51 +1,12 @@ package core import ( - "errors" "fmt" - "slices" - "strings" "testing" - "time" _ "github.com/mattn/go-sqlite3" ) -func TestCreateSource(t *testing.T) { - db := EphemeralDb(t) - - if exists, err := SourceExists(db, "one"); exists || err != nil { - t.Fatal(err) - } - if err := AddSource(db, "one"); err != nil { - t.Fatal(err) - } - if exists, err := SourceExists(db, "one"); !exists || err != nil { - t.Fatal(err) - } - - if err := AddSource(db, "two"); err != nil { - t.Fatal(err) - } - if err := AddSource(db, "three"); err != nil { - t.Fatal(err) - } - if err := DeleteSource(db, "two"); err != nil { - t.Fatal(err) - } - - names, err := GetSources(db) - if err != nil { - t.Fatal(err) - } - expected := []string{"one", "three"} - for i := 0; i < len(expected); i += 1 { - if !slices.Contains(names, expected[i]) { - t.Fatalf("missing %s, have: %v", expected[i], names) - } - } -} - func AssertItemIs(t *testing.T, item Item, expected string) { actual := fmt.Sprintf( "%s/%s/%t/%s/%s/%s/%s/%d", @@ -129,246 +90,3 @@ func TestAddItem(t *testing.T) { t.Fatal("should get one item") } } - -func TestUpdateSourceAddAndDelete(t *testing.T) { - db := EphemeralDb(t) - if err := AddSource(db, "test"); err != nil { - t.Fatal(err) - } - - a := Item{Source: "test", Id: "a"} - add, del, err := UpdateWithFetchedItems(db, "test", nil, []Item{a}) - if add != 1 || del != 0 || err != nil { - t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) - } - - add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a}) - if add != 0 || del != 0 || err != nil { - t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) - } - - b := Item{Source: "test", Id: "b"} - add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a, b}) - if add != 1 || del != 0 || err != nil { - t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) - } - - if _, err = DeactivateItem(db, "test", "a"); err != nil { - t.Fatal(err) - } - add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a, b}) - if add != 0 || del != 0 || err != nil { - t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) - } - - add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{b}) - if add != 0 || del != 1 || err != nil { - t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) - } - - add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{b}) - if add != 0 || del != 0 || err != nil { - t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) - } -} - -func TestUpdateSourceTransaction(t *testing.T) { - db := EphemeralDb(t) - if err := AddSource(db, "s"); err != nil { - t.Fatal(err) - } - - a := Item{Source: "s", Id: "a"} - b := Item{Source: "s", Id: "b"} - - // Add and deactivate a so it will be deleted on next fetch without it - if add, _, err := UpdateWithFetchedItems(db, "s", nil, []Item{a}); add != 1 || err != nil { - t.Fatalf("expected 1 add, got %d and err %v", add, err) - } - if _, err := DeactivateItem(db, "s", "a"); err != nil { - t.Fatal(err) - } - - // Add b and cause a to be deleted, but the delete throws an error - fdb := &FailureDb{ - db: db, - execError: func(q string, a ...any) error { - if strings.Contains(q, "delete from") { - return errors.New("no deletes!") - } - return nil - }, - } - add, del, err := UpdateWithFetchedItems(fdb, "s", nil, []Item{b}) - if add != 0 || del != 0 || err == nil { - t.Fatalf("expected failure, got %d %d %v", add, del, err) - } - - // Failure should not add b - items, err := GetAllItemsForSource(db, "s") - if err != nil { - t.Fatal(err) - } - if len(items) != 1 { - t.Fatalf("should only have one item, got %d", len((items))) - } - if items[0].Id != "a" { - t.Fatalf("expected only item to still be a, got %s", items[0].Id) - } -} - -func TestOnCreateAction(t *testing.T) { - db := EphemeralDb(t) - if err := AddSource(db, "test"); err != nil { - t.Fatal(err) - } - if err := AddAction(db, "test", "on_create", []string{"true"}); err != nil { - t.Fatal(err) - } - - execute := func(argv []string) []Item { - items, _, err := Execute("test", argv, nil, nil, "", time.Minute) - if err != nil { - t.Fatal("unexpected error executing test fetch") - } - if len(items) != 1 { - t.Fatalf("expected only one item, got %d", len(items)) - } - return items - } - - onCreate := func(argv []string) { - if err := UpdateAction(db, "test", "on_create", argv); err != nil { - t.Fatal(err) - } - } - - getItem := func(id string) Item { - item, err := GetItem(db, "test", id) - if err != nil { - t.Fatal(err) - } - return item - } - - // Noop on_create works - onCreate([]string{"tee"}) - items := execute([]string{"jq", "-cn", `{id: "one"}`}) - add, _, err := UpdateWithFetchedItems(db, "test", nil, items) - if add != 1 || err != nil { - t.Fatal("failed update with noop oncreate") - } - updated := getItem("one") - updated.Created = 0 // zero out for comparison with pre-insert item - if !ItemsAreEqual(updated, items[0]) { - t.Fatalf("expected no change: %#v != %#v", updated, items[0]) - } - - // on_create can change a field - onCreate([]string{"jq", "-c", `.title = "Goodbye, World"`}) - items = execute([]string{"jq", "-cn", `{id: "two", title: "Hello, World"}`}) - if items[0].Title != "Hello, World" { - t.Fatal("unexpected title") - } - add, _, err = UpdateWithFetchedItems(db, "test", nil, items) - if add != 1 || err != nil { - t.Fatal("failed update with alter oncreate") - } - two := getItem("two") - if two.Title != "Goodbye, World" { - t.Fatalf("title not updated, is: %s", two.Title) - } - - // on_create can add a field - onCreate([]string{"jq", "-c", `.link = "gopher://go.dev"`}) - items = execute([]string{"jq", "-cn", `{id: "three"}`}) - if items[0].Link != "" { - t.Fatal("unexpected link") - } - add, _, err = UpdateWithFetchedItems(db, "test", nil, items) - if add != 1 || err != nil { - t.Fatal("failed update with augment oncreate") - } - if getItem("three").Link != "gopher://go.dev" { - t.Fatal("link not added") - } - - // on_create can't delete a field using a zero value - // due to zero values preserving prior field values - onCreate([]string{"jq", "-c", `del(.link)`}) - items = execute([]string{"jq", "-cn", `{id: "four", link: "gopher://go.dev"}`}) - if items[0].Link != "gopher://go.dev" { - t.Fatal("missing link") - } - add, _, err = UpdateWithFetchedItems(db, "test", nil, items) - if add != 1 || err != nil { - t.Fatal("failed update with attempted deletion oncreate") - } - if getItem("four").Link != "gopher://go.dev" { - t.Fatal("link unexpectedly removed") - } - - // item is created if on_create fails - onCreate([]string{"false"}) - items = execute([]string{"jq", "-cn", `{id: "five"}`}) - add, _, err = UpdateWithFetchedItems(db, "test", nil, items) - if add != 1 || err != nil { - t.Fatal("failed update with failing oncreate") - } - if getItem("five").Id != "five" { - t.Fatal("item not created") - } - - // item isn't updated if on_create has valid output but a bad exit code - onCreate([]string{"sh", "-c", `jq -cn '{id: "six", title: "after"}'; exit 1`}) - items = execute([]string{"jq", "-cn", `{id: "six", title: "before"}`}) - if items[0].Title != "before" { - t.Fatal("unexpected title") - } - add, _, err = UpdateWithFetchedItems(db, "test", nil, items) - if add != 1 || err != nil { - t.Fatal("failed update with bad exit code oncreate") - } - if getItem("six").Title != "before" { - t.Fatal("update applied after oncreate failed") - } - - // on_create can't change id, active, or created - onCreate([]string{"jq", "-c", `.id = "seven"; .active = false; .created = 123456`}) - items = execute([]string{"jq", "-cn", `{id: "seven"}`}) - add, _, err = UpdateWithFetchedItems(db, "test", nil, items) - if add != 1 || err != nil { - t.Fatal("failed update with invalid field changes oncreate") - } - updated = getItem("seven") - if updated.Id != "seven" || !updated.Active || updated.Created == 123456 { - t.Fatal("unexpected changes to id, active, or created fields") - } -} - -func TestSourceState(t *testing.T) { - db := EphemeralDb(t) - if err := AddSource(db, "s"); err != nil { - t.Fatal(err) - } - - state, err := GetState(db, "s") - if err != nil { - t.Fatal(err) - } - if len(state) != 0 { - t.Fatal("expected no state on a fresh source") - } - - if err = SetState(db, "s", []byte("hello, world")); err != nil { - t.Fatal(err) - } - - state, err = GetState(db, "s") - if err != nil { - t.Fatal(err) - } - if string(state) != "hello, world" { - t.Fatalf("expected hello, world, got %s", state) - } -}