intake/core/source_test.go

325 lines
8.3 KiB
Go
Raw Normal View History

2025-01-16 22:53:04 +00:00
package core
2025-01-16 19:46:37 +00:00
import (
2025-01-31 17:55:07 +00:00
"errors"
2025-01-16 21:46:30 +00:00
"slices"
2025-01-31 17:55:07 +00:00
"strings"
2025-01-16 19:46:37 +00:00
"testing"
2025-01-24 23:41:52 +00:00
"time"
2025-01-16 19:46:37 +00:00
_ "github.com/mattn/go-sqlite3"
)
2025-01-16 21:46:30 +00:00
func TestCreateSource(t *testing.T) {
db := EphemeralDb(t)
if exists, err := SourceExists(db, "one"); exists || err != nil {
t.Fatal(err)
}
2025-01-16 21:46:30 +00:00
if err := AddSource(db, "one"); err != nil {
t.Fatal(err)
}
if exists, err := SourceExists(db, "one"); !exists || err != nil {
t.Fatal(err)
}
2025-01-16 21:46:30 +00:00
if err := AddSource(db, "two"); err != nil {
t.Fatal(err)
}
if err := AddSource(db, "three"); err != nil {
t.Fatal(err)
}
if err := DeleteSource(db, "two"); err != nil {
t.Fatal(err)
}
2025-01-23 15:55:11 +00:00
names, err := GetSources(db)
2025-01-16 21:46:30 +00:00
if err != nil {
t.Fatal(err)
}
expected := []string{"one", "three"}
for i := 0; i < len(expected); i += 1 {
if !slices.Contains(names, expected[i]) {
t.Fatalf("missing %s, have: %v", expected[i], names)
}
}
}
2025-01-23 20:26:21 +00:00
func TestUpdateSourceAddAndDelete(t *testing.T) {
db := EphemeralDb(t)
if err := AddSource(db, "test"); err != nil {
t.Fatal(err)
}
2025-02-05 19:38:30 +00:00
update := func(items []Item) (int, int, error) {
t.Helper()
return UpdateWithFetchedItems(db, "test", nil, items, time.Now())
}
2025-01-23 20:26:21 +00:00
a := Item{Source: "test", Id: "a"}
2025-02-05 19:38:30 +00:00
add, del, err := update([]Item{a})
2025-01-23 20:26:21 +00:00
if add != 1 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
}
2025-02-05 19:38:30 +00:00
add, del, err = update([]Item{a})
2025-01-23 20:26:21 +00:00
if add != 0 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
}
b := Item{Source: "test", Id: "b"}
2025-02-05 19:38:30 +00:00
add, del, err = update([]Item{a, b})
2025-01-23 20:26:21 +00:00
if add != 1 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
}
if _, err = DeactivateItem(db, "test", "a"); err != nil {
t.Fatal(err)
}
2025-02-05 19:38:30 +00:00
add, del, err = update([]Item{a, b})
2025-01-23 20:26:21 +00:00
if add != 0 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
}
2025-02-05 19:38:30 +00:00
add, del, err = update([]Item{b})
2025-01-23 20:26:21 +00:00
if add != 0 || del != 1 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
}
2025-02-05 19:38:30 +00:00
add, del, err = update([]Item{b})
2025-01-23 20:26:21 +00:00
if add != 0 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
}
}
2025-01-24 23:41:52 +00:00
2025-01-31 17:55:07 +00:00
func TestUpdateSourceTransaction(t *testing.T) {
db := EphemeralDb(t)
if err := AddSource(db, "s"); err != nil {
t.Fatal(err)
}
a := Item{Source: "s", Id: "a"}
b := Item{Source: "s", Id: "b"}
// Add and deactivate a so it will be deleted on next fetch without it
2025-02-05 19:38:30 +00:00
if add, _, err := UpdateWithFetchedItems(db, "s", nil, []Item{a}, time.Now()); add != 1 || err != nil {
2025-01-31 17:55:07 +00:00
t.Fatalf("expected 1 add, got %d and err %v", add, err)
}
if _, err := DeactivateItem(db, "s", "a"); err != nil {
t.Fatal(err)
}
// Add b and cause a to be deleted, but the delete throws an error
fdb := &FailureDb{
db: db,
execError: func(q string, a ...any) error {
if strings.Contains(q, "delete from") {
return errors.New("no deletes!")
}
return nil
},
}
2025-02-05 19:38:30 +00:00
add, del, err := UpdateWithFetchedItems(fdb, "s", nil, []Item{b}, time.Now())
2025-01-31 17:55:07 +00:00
if add != 0 || del != 0 || err == nil {
t.Fatalf("expected failure, got %d %d %v", add, del, err)
}
// Failure should not add b
items, err := GetAllItemsForSource(db, "s")
if err != nil {
t.Fatal(err)
}
if len(items) != 1 {
t.Fatalf("should only have one item, got %d", len((items)))
}
if items[0].Id != "a" {
t.Fatalf("expected only item to still be a, got %s", items[0].Id)
}
}
2025-01-24 23:41:52 +00:00
func TestOnCreateAction(t *testing.T) {
db := EphemeralDb(t)
if err := AddSource(db, "test"); err != nil {
t.Fatal(err)
}
if err := AddAction(db, "test", "on_create", []string{"true"}); err != nil {
t.Fatal(err)
}
execute := func(argv []string) []Item {
2025-02-05 17:25:33 +00:00
t.Helper()
2025-02-05 21:21:31 +00:00
items, _, err := Execute("test", argv, nil, nil, "", time.Minute, nil)
2025-01-24 23:41:52 +00:00
if err != nil {
t.Fatal("unexpected error executing test fetch")
}
if len(items) != 1 {
t.Fatalf("expected only one item, got %d", len(items))
}
return items
}
2025-02-05 19:38:30 +00:00
update := func(items []Item) (int, int, error) {
t.Helper()
return UpdateWithFetchedItems(db, "test", nil, items, time.Now())
}
2025-01-24 23:41:52 +00:00
onCreate := func(argv []string) {
2025-02-05 17:25:33 +00:00
t.Helper()
2025-01-24 23:41:52 +00:00
if err := UpdateAction(db, "test", "on_create", argv); err != nil {
t.Fatal(err)
}
}
getItem := func(id string) Item {
2025-02-05 17:25:33 +00:00
t.Helper()
2025-01-24 23:41:52 +00:00
item, err := GetItem(db, "test", id)
if err != nil {
t.Fatal(err)
}
return item
}
// Noop on_create works
onCreate([]string{"tee"})
items := execute([]string{"jq", "-cn", `{id: "one"}`})
2025-02-05 19:38:30 +00:00
add, _, err := update(items)
2025-01-24 23:41:52 +00:00
if add != 1 || err != nil {
t.Fatal("failed update with noop oncreate")
}
updated := getItem("one")
updated.Created = 0 // zero out for comparison with pre-insert item
if !ItemsAreEqual(updated, items[0]) {
2025-01-24 23:41:52 +00:00
t.Fatalf("expected no change: %#v != %#v", updated, items[0])
}
// on_create can change a field
onCreate([]string{"jq", "-c", `.title = "Goodbye, World"`})
items = execute([]string{"jq", "-cn", `{id: "two", title: "Hello, World"}`})
if items[0].Title != "Hello, World" {
t.Fatal("unexpected title")
}
2025-02-05 19:38:30 +00:00
add, _, err = update(items)
2025-01-24 23:41:52 +00:00
if add != 1 || err != nil {
t.Fatal("failed update with alter oncreate")
}
2025-01-29 16:48:12 +00:00
two := getItem("two")
if two.Title != "Goodbye, World" {
t.Fatalf("title not updated, is: %s", two.Title)
2025-01-24 23:41:52 +00:00
}
// on_create can add a field
onCreate([]string{"jq", "-c", `.link = "gopher://go.dev"`})
items = execute([]string{"jq", "-cn", `{id: "three"}`})
if items[0].Link != "" {
t.Fatal("unexpected link")
}
2025-02-05 19:38:30 +00:00
add, _, err = update(items)
2025-01-24 23:41:52 +00:00
if add != 1 || err != nil {
t.Fatal("failed update with augment oncreate")
}
if getItem("three").Link != "gopher://go.dev" {
t.Fatal("link not added")
}
// on_create can't delete a field using a zero value
// due to zero values preserving prior field values
onCreate([]string{"jq", "-c", `del(.link)`})
items = execute([]string{"jq", "-cn", `{id: "four", link: "gopher://go.dev"}`})
if items[0].Link != "gopher://go.dev" {
t.Fatal("missing link")
}
2025-02-05 19:38:30 +00:00
add, _, err = update(items)
2025-01-24 23:41:52 +00:00
if add != 1 || err != nil {
t.Fatal("failed update with attempted deletion oncreate")
}
if getItem("four").Link != "gopher://go.dev" {
t.Fatal("link unexpectedly removed")
}
// item is created if on_create fails
onCreate([]string{"false"})
items = execute([]string{"jq", "-cn", `{id: "five"}`})
2025-02-05 19:38:30 +00:00
add, _, err = update(items)
2025-01-24 23:41:52 +00:00
if add != 1 || err != nil {
t.Fatal("failed update with failing oncreate")
}
if getItem("five").Id != "five" {
t.Fatal("item not created")
}
// item isn't updated if on_create has valid output but a bad exit code
onCreate([]string{"sh", "-c", `jq -cn '{id: "six", title: "after"}'; exit 1`})
items = execute([]string{"jq", "-cn", `{id: "six", title: "before"}`})
if items[0].Title != "before" {
t.Fatal("unexpected title")
}
2025-02-05 19:38:30 +00:00
add, _, err = update(items)
2025-01-24 23:41:52 +00:00
if add != 1 || err != nil {
t.Fatal("failed update with bad exit code oncreate")
}
if getItem("six").Title != "before" {
t.Fatal("update applied after oncreate failed")
}
// on_create can't change id, active, or created
onCreate([]string{"jq", "-c", `.id = "seven"; .active = false; .created = 123456`})
items = execute([]string{"jq", "-cn", `{id: "seven"}`})
2025-02-05 19:38:30 +00:00
add, _, err = update(items)
2025-01-24 23:41:52 +00:00
if add != 1 || err != nil {
t.Fatal("failed update with invalid field changes oncreate")
}
updated = getItem("seven")
if updated.Id != "seven" || !updated.Active || updated.Created == 123456 {
t.Fatal("unexpected changes to id, active, or created fields")
}
}
2025-01-30 23:22:57 +00:00
func TestSourceState(t *testing.T) {
db := EphemeralDb(t)
if err := AddSource(db, "s"); err != nil {
t.Fatal(err)
}
state, err := GetState(db, "s")
if err != nil {
t.Fatal(err)
}
if len(state) != 0 {
t.Fatal("expected no state on a fresh source")
}
if err = SetState(db, "s", []byte("hello, world")); err != nil {
t.Fatal(err)
}
state, err = GetState(db, "s")
if err != nil {
t.Fatal(err)
}
if string(state) != "hello, world" {
t.Fatalf("expected hello, world, got %s", state)
}
}
2025-02-05 21:21:31 +00:00
func TestSourceTtx(t *testing.T) {
db := EphemeralDb(t)
if err := AddSource(db, "s"); err != nil {
t.Fatal(err)
}
if err := SetEnvs(db, "s", []string{
"INTAKE_TTL=30",
"INTAKE_TTD=60",
"INTAKE_TTS=90",
}); err != nil {
t.Fatal(err)
}
postProcess, err := GetSourcePostProcessor(db, "s")
if err != nil {
t.Fatal(err)
}
before := Item{Source: "s", Id: "i"}
after := postProcess(before)
if after.Ttl != 30 || after.Ttd != 60 || after.Tts != 90 {
t.Fatalf("Missing value after postProcess: ttl = %d, ttd = %d, tts = %d", after.Ttl, after.Ttd, after.Tts)
}
}