package core import ( "errors" "slices" "strings" "testing" "time" _ "github.com/mattn/go-sqlite3" ) func TestCreateSource(t *testing.T) { db := EphemeralDb(t) if exists, err := SourceExists(db, "one"); exists || err != nil { t.Fatal(err) } if err := AddSource(db, "one"); err != nil { t.Fatal(err) } if exists, err := SourceExists(db, "one"); !exists || err != nil { t.Fatal(err) } if err := AddSource(db, "two"); err != nil { t.Fatal(err) } if err := AddSource(db, "three"); err != nil { t.Fatal(err) } if err := DeleteSource(db, "two"); err != nil { t.Fatal(err) } names, err := GetSources(db) if err != nil { t.Fatal(err) } expected := []string{"one", "three"} for i := 0; i < len(expected); i += 1 { if !slices.Contains(names, expected[i]) { t.Fatalf("missing %s, have: %v", expected[i], names) } } } func TestUpdateSourceAddAndDelete(t *testing.T) { db := EphemeralDb(t) if err := AddSource(db, "test"); err != nil { t.Fatal(err) } update := func(items []Item) (int, int, error) { t.Helper() return UpdateWithFetchedItems(db, "test", nil, items, time.Now()) } a := Item{Source: "test", Id: "a"} add, del, err := update([]Item{a}) if add != 1 || del != 0 || err != nil { t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) } add, del, err = update([]Item{a}) if add != 0 || del != 0 || err != nil { t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) } b := Item{Source: "test", Id: "b"} add, del, err = update([]Item{a, b}) if add != 1 || del != 0 || err != nil { t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) } if _, err = DeactivateItem(db, "test", "a"); err != nil { t.Fatal(err) } add, del, err = update([]Item{a, b}) if add != 0 || del != 0 || err != nil { t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) } add, del, err = update([]Item{b}) if add != 0 || del != 1 || err != nil { t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) } add, del, err = update([]Item{b}) if add != 0 || del != 0 || err != nil { t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) } } func TestUpdateSourceTransaction(t *testing.T) { db := EphemeralDb(t) if err := AddSource(db, "s"); err != nil { t.Fatal(err) } a := Item{Source: "s", Id: "a"} b := Item{Source: "s", Id: "b"} // Add and deactivate a so it will be deleted on next fetch without it if add, _, err := UpdateWithFetchedItems(db, "s", nil, []Item{a}, time.Now()); add != 1 || err != nil { t.Fatalf("expected 1 add, got %d and err %v", add, err) } if _, err := DeactivateItem(db, "s", "a"); err != nil { t.Fatal(err) } // Add b and cause a to be deleted, but the delete throws an error fdb := &FailureDb{ db: db, execError: func(q string, a ...any) error { if strings.Contains(q, "delete from") { return errors.New("no deletes!") } return nil }, } add, del, err := UpdateWithFetchedItems(fdb, "s", nil, []Item{b}, time.Now()) if add != 0 || del != 0 || err == nil { t.Fatalf("expected failure, got %d %d %v", add, del, err) } // Failure should not add b items, err := GetAllItemsForSource(db, "s", 0, -1) if err != nil { t.Fatal(err) } if len(items) != 1 { t.Fatalf("should only have one item, got %d", len((items))) } if items[0].Id != "a" { t.Fatalf("expected only item to still be a, got %s", items[0].Id) } } func TestOnCreateAction(t *testing.T) { db := EphemeralDb(t) if err := AddSource(db, "test"); err != nil { t.Fatal(err) } if err := SetAction(db, "test", "on_create", []string{"true"}); err != nil { t.Fatal(err) } execute := func(argv []string) []Item { t.Helper() items, _, _, err := Execute("test", argv, nil, nil, "", time.Minute, nil) if err != nil { t.Fatalf("unexpected error executing test fetch: %v", err) } if len(items) != 1 { t.Fatalf("expected only one item, got %d", len(items)) } return items } update := func(items []Item) (int, int, error) { t.Helper() return UpdateWithFetchedItems(db, "test", nil, items, time.Now()) } onCreate := func(argv []string) { t.Helper() if err := SetAction(db, "test", "on_create", argv); err != nil { t.Fatal(err) } } getItem := func(id string) Item { t.Helper() item, err := GetItem(db, "test", id) if err != nil { t.Fatal(err) } return item } // Noop on_create works onCreate([]string{"tee"}) items := execute([]string{"jq", "-cn", `{id: "one"}`}) add, _, err := update(items) if add != 1 || err != nil { t.Fatalf("failed update with noop oncreate: %v", err) } updated := getItem("one") updated.Created = 0 // zero out for comparison with pre-insert item if !ItemsAreEqual(updated, items[0]) { t.Fatalf("expected no change: %#v != %#v", updated, items[0]) } // on_create can change a field onCreate([]string{"jq", "-c", `.title = "Goodbye, World"`}) items = execute([]string{"jq", "-cn", `{id: "two", title: "Hello, World"}`}) if items[0].Title != "Hello, World" { t.Fatal("unexpected title") } add, _, err = update(items) if add != 1 || err != nil { t.Fatal("failed update with alter oncreate") } two := getItem("two") if two.Title != "Goodbye, World" { t.Fatalf("title not updated, is: %s", two.Title) } // on_create can add a field onCreate([]string{"jq", "-c", `.link = "gopher://go.dev"`}) items = execute([]string{"jq", "-cn", `{id: "three"}`}) if items[0].Link != "" { t.Fatal("unexpected link") } add, _, err = update(items) if add != 1 || err != nil { t.Fatal("failed update with augment oncreate") } if getItem("three").Link != "gopher://go.dev" { t.Fatal("link not added") } // on_create can't delete a field using a zero value // due to zero values preserving prior field values onCreate([]string{"jq", "-c", `del(.link)`}) items = execute([]string{"jq", "-cn", `{id: "four", link: "gopher://go.dev"}`}) if items[0].Link != "gopher://go.dev" { t.Fatal("missing link") } add, _, err = update(items) if add != 1 || err != nil { t.Fatal("failed update with attempted deletion oncreate") } if getItem("four").Link != "gopher://go.dev" { t.Fatal("link unexpectedly removed") } // item is created if on_create fails onCreate([]string{"false"}) items = execute([]string{"jq", "-cn", `{id: "five"}`}) add, _, err = update(items) if add != 1 || err != nil { t.Fatal("failed update with failing oncreate") } if getItem("five").Id != "five" { t.Fatal("item not created") } // item isn't updated if on_create has valid output but a bad exit code onCreate([]string{"sh", "-c", `jq -cn '{id: "six", title: "after"}'; exit 1`}) items = execute([]string{"jq", "-cn", `{id: "six", title: "before"}`}) if items[0].Title != "before" { t.Fatal("unexpected title") } add, _, err = update(items) if add != 1 || err != nil { t.Fatal("failed update with bad exit code oncreate") } if getItem("six").Title != "before" { t.Fatal("update applied after oncreate failed") } // on_create can't change id, active, or created onCreate([]string{"jq", "-c", `.id = "seven"; .active = false; .created = 123456`}) items = execute([]string{"jq", "-cn", `{id: "seven"}`}) add, _, err = update(items) if add != 1 || err != nil { t.Fatal("failed update with invalid field changes oncreate") } updated = getItem("seven") if updated.Id != "seven" || !updated.Active || updated.Created == 123456 { t.Fatal("unexpected changes to id, active, or created fields") } } func TestSourceState(t *testing.T) { db := EphemeralDb(t) if err := AddSource(db, "s"); err != nil { t.Fatal(err) } state, err := GetState(db, "s") if err != nil { t.Fatal(err) } if len(state) != 0 { t.Fatal("expected no state on a fresh source") } if err = SetState(db, "s", []byte("hello, world")); err != nil { t.Fatal(err) } state, err = GetState(db, "s") if err != nil { t.Fatal(err) } if string(state) != "hello, world" { t.Fatalf("expected hello, world, got %s", state) } } func TestSourceTtx(t *testing.T) { db := EphemeralDb(t) if err := AddSource(db, "s"); err != nil { t.Fatal(err) } if err := SetEnvs(db, "s", []string{ "INTAKE_TTL=30", "INTAKE_TTD=60", "INTAKE_TTS=90", }); err != nil { t.Fatal(err) } postProcess, err := GetSourcePostProcessor(db, "s") if err != nil { t.Fatal(err) } before := Item{Source: "s", Id: "i"} after := postProcess(before) if after.Ttl != 30 || after.Ttd != 60 || after.Tts != 90 { t.Fatalf("Missing value after postProcess: ttl = %d, ttd = %d, tts = %d", after.Ttl, after.Ttd, after.Tts) } } func TestSourceLastUpdated(t *testing.T) { db := EphemeralDb(t) if err := AddSource(db, "s"); err != nil { t.Fatal(err) } updated, err := GetLastUpdated(db, "s") if err != nil { t.Fatalf("failed to get lastUpdated: %v", err) } if updated != time.Unix(0, 0).UTC() { t.Fatalf("expected epoch time, got %v", updated) } now := time.Now().UTC().Round(time.Second) _, _, err = UpdateWithFetchedItems(db, "s", nil, nil, now) if err != nil { t.Fatal(err) } updated, err = GetLastUpdated(db, "s") if err != nil { t.Fatalf("failed to get lastUpdated: %v", err) } if updated != now { t.Fatalf("incorrect last updated time\nnow: %v\ngot: %v", now, updated) } }