Reduce items.go to item functions

This commit is contained in:
Tim Van Baak 2025-01-31 15:02:45 -08:00
parent c9949b7b25
commit 0337cc8ee3
2 changed files with 0 additions and 445 deletions

View File

@ -5,56 +5,10 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"log"
"time"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
) )
func AddSource(db DB, name string) error {
_, err := db.Exec(`
insert into sources (name)
values (?)
`, name)
return err
}
func GetSources(db DB) ([]string, error) {
rows, err := db.Query(`
select name
from sources
`)
if err != nil {
return nil, err
}
var names []string
for rows.Next() {
var name string
if err = rows.Scan(&name); err != nil {
return nil, err
}
names = append(names, name)
}
return names, nil
}
func SourceExists(db DB, source string) (bool, error) {
row := db.QueryRow("select count(*) from sources where name = ?", source)
var c int
err := row.Scan(&c)
return c > 0, err
}
func DeleteSource(db DB, name string) error {
_, err := db.Exec(`
delete from sources
where name = ?
`, name)
return err
}
func AddItems(db DB, items []Item) error { func AddItems(db DB, items []Item) error {
return db.Transact(func(tx DB) error { return db.Transact(func(tx DB) error {
stmt, err := tx.Prepare(` stmt, err := tx.Prepare(`
@ -244,120 +198,3 @@ func GetAllItemsForSource(db DB, source string) ([]Item, error) {
order by case when time = 0 then created else time end, id order by case when time = 0 then created else time end, id
`, source) `, source)
} }
func GetState(db DB, source string) ([]byte, error) {
row := db.QueryRow("select state from sources where name = ?", source)
var state []byte
err := row.Scan(&state)
return state, err
}
func SetState(db DB, source string, state []byte) error {
_, err := db.Exec("update sources set state = ? where name = ?", state, source)
return err
}
// Given the results of a fetch, add new items, update existing items, and delete expired items.
//
// Returns the number of new and deleted items on success.
func UpdateWithFetchedItems(db DB, source string, state []byte, items []Item) (int, int, error) {
var new int
var del int
var err error
err = db.Transact(func(tx DB) error {
new, del, err = updateWithFetchedItemsTx(tx, source, state, items)
return err
})
return new, del, err
}
// Implementation logic for [UpdateWithFetchedItems], which executes this inside a transaction.
func updateWithFetchedItemsTx(db DB, source string, state []byte, items []Item) (int, int, error) {
// Get the existing items
existingItems, err := GetAllItemsForSource(db, source)
if err != nil {
return 0, 0, err
}
existingIds := map[string]bool{}
existingItemsById := map[string]*Item{}
for _, item := range existingItems {
existingIds[item.Id] = true
existingItemsById[item.Id] = &item
}
// Split the fetch into adds and updates
var newItems []Item
var updatedItems []Item
for _, item := range items {
if existingIds[item.Id] {
updatedItems = append(updatedItems, item)
} else {
newItems = append(newItems, item)
}
}
// Bulk insert the new items
if err = AddItems(db, newItems); err != nil {
return 0, 0, err
}
// Bulk update the existing items
for _, item := range updatedItems {
BackfillItem(&item, existingItemsById[item.Id])
}
if err = UpdateItems(db, updatedItems); err != nil {
return 0, 0, err
}
envs, err := GetEnvs(db, source)
if err != nil {
return 0, 0, fmt.Errorf("failed to get envs for %s: %v", source, err)
}
// If the source has an on-create trigger, run it for each new item
// On-create errors are ignored to avoid failing the fetch
onCreateArgv, err := GetArgvForAction(db, source, "on_create")
if err == nil && len(onCreateArgv) > 0 {
var updatedNewItems []Item
for _, item := range newItems {
var updatedItem Item
updatedItem, state, err = ExecuteItemAction(item, onCreateArgv, envs, state, time.Minute)
if err != nil {
log.Printf("error: on_create failed for %s/%s: %v", item.Source, item.Id, err)
}
updatedNewItems = append(updatedNewItems, updatedItem)
}
UpdateItems(db, updatedNewItems)
}
// Get the list of expired items
fetchedIds := map[string]bool{}
for _, item := range items {
fetchedIds[item.Id] = true
}
expiredIds := map[string]bool{}
for id := range existingIds {
expiredIds[id] = !fetchedIds[id]
}
// Check expired items for deletion
idsToDelete := map[string]bool{}
for _, item := range existingItems {
if expiredIds[item.Id] && item.Deletable() {
idsToDelete[item.Id] = true
}
}
// Delete each item to be deleted
for id := range idsToDelete {
if _, err = DeleteItem(db, source, id); err != nil {
return 0, 0, err
}
}
if err = SetState(db, source, state); err != nil {
return 0, 0, err
}
return len(newItems), len(idsToDelete), nil
}

View File

@ -1,51 +1,12 @@
package core package core
import ( import (
"errors"
"fmt" "fmt"
"slices"
"strings"
"testing" "testing"
"time"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
) )
func TestCreateSource(t *testing.T) {
db := EphemeralDb(t)
if exists, err := SourceExists(db, "one"); exists || err != nil {
t.Fatal(err)
}
if err := AddSource(db, "one"); err != nil {
t.Fatal(err)
}
if exists, err := SourceExists(db, "one"); !exists || err != nil {
t.Fatal(err)
}
if err := AddSource(db, "two"); err != nil {
t.Fatal(err)
}
if err := AddSource(db, "three"); err != nil {
t.Fatal(err)
}
if err := DeleteSource(db, "two"); err != nil {
t.Fatal(err)
}
names, err := GetSources(db)
if err != nil {
t.Fatal(err)
}
expected := []string{"one", "three"}
for i := 0; i < len(expected); i += 1 {
if !slices.Contains(names, expected[i]) {
t.Fatalf("missing %s, have: %v", expected[i], names)
}
}
}
func AssertItemIs(t *testing.T, item Item, expected string) { func AssertItemIs(t *testing.T, item Item, expected string) {
actual := fmt.Sprintf( actual := fmt.Sprintf(
"%s/%s/%t/%s/%s/%s/%s/%d", "%s/%s/%t/%s/%s/%s/%s/%d",
@ -129,246 +90,3 @@ func TestAddItem(t *testing.T) {
t.Fatal("should get one item") t.Fatal("should get one item")
} }
} }
func TestUpdateSourceAddAndDelete(t *testing.T) {
db := EphemeralDb(t)
if err := AddSource(db, "test"); err != nil {
t.Fatal(err)
}
a := Item{Source: "test", Id: "a"}
add, del, err := UpdateWithFetchedItems(db, "test", nil, []Item{a})
if add != 1 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
}
add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a})
if add != 0 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
}
b := Item{Source: "test", Id: "b"}
add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a, b})
if add != 1 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
}
if _, err = DeactivateItem(db, "test", "a"); err != nil {
t.Fatal(err)
}
add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a, b})
if add != 0 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
}
add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{b})
if add != 0 || del != 1 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
}
add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{b})
if add != 0 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
}
}
func TestUpdateSourceTransaction(t *testing.T) {
db := EphemeralDb(t)
if err := AddSource(db, "s"); err != nil {
t.Fatal(err)
}
a := Item{Source: "s", Id: "a"}
b := Item{Source: "s", Id: "b"}
// Add and deactivate a so it will be deleted on next fetch without it
if add, _, err := UpdateWithFetchedItems(db, "s", nil, []Item{a}); add != 1 || err != nil {
t.Fatalf("expected 1 add, got %d and err %v", add, err)
}
if _, err := DeactivateItem(db, "s", "a"); err != nil {
t.Fatal(err)
}
// Add b and cause a to be deleted, but the delete throws an error
fdb := &FailureDb{
db: db,
execError: func(q string, a ...any) error {
if strings.Contains(q, "delete from") {
return errors.New("no deletes!")
}
return nil
},
}
add, del, err := UpdateWithFetchedItems(fdb, "s", nil, []Item{b})
if add != 0 || del != 0 || err == nil {
t.Fatalf("expected failure, got %d %d %v", add, del, err)
}
// Failure should not add b
items, err := GetAllItemsForSource(db, "s")
if err != nil {
t.Fatal(err)
}
if len(items) != 1 {
t.Fatalf("should only have one item, got %d", len((items)))
}
if items[0].Id != "a" {
t.Fatalf("expected only item to still be a, got %s", items[0].Id)
}
}
func TestOnCreateAction(t *testing.T) {
db := EphemeralDb(t)
if err := AddSource(db, "test"); err != nil {
t.Fatal(err)
}
if err := AddAction(db, "test", "on_create", []string{"true"}); err != nil {
t.Fatal(err)
}
execute := func(argv []string) []Item {
items, _, err := Execute("test", argv, nil, nil, "", time.Minute)
if err != nil {
t.Fatal("unexpected error executing test fetch")
}
if len(items) != 1 {
t.Fatalf("expected only one item, got %d", len(items))
}
return items
}
onCreate := func(argv []string) {
if err := UpdateAction(db, "test", "on_create", argv); err != nil {
t.Fatal(err)
}
}
getItem := func(id string) Item {
item, err := GetItem(db, "test", id)
if err != nil {
t.Fatal(err)
}
return item
}
// Noop on_create works
onCreate([]string{"tee"})
items := execute([]string{"jq", "-cn", `{id: "one"}`})
add, _, err := UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil {
t.Fatal("failed update with noop oncreate")
}
updated := getItem("one")
updated.Created = 0 // zero out for comparison with pre-insert item
if !ItemsAreEqual(updated, items[0]) {
t.Fatalf("expected no change: %#v != %#v", updated, items[0])
}
// on_create can change a field
onCreate([]string{"jq", "-c", `.title = "Goodbye, World"`})
items = execute([]string{"jq", "-cn", `{id: "two", title: "Hello, World"}`})
if items[0].Title != "Hello, World" {
t.Fatal("unexpected title")
}
add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil {
t.Fatal("failed update with alter oncreate")
}
two := getItem("two")
if two.Title != "Goodbye, World" {
t.Fatalf("title not updated, is: %s", two.Title)
}
// on_create can add a field
onCreate([]string{"jq", "-c", `.link = "gopher://go.dev"`})
items = execute([]string{"jq", "-cn", `{id: "three"}`})
if items[0].Link != "" {
t.Fatal("unexpected link")
}
add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil {
t.Fatal("failed update with augment oncreate")
}
if getItem("three").Link != "gopher://go.dev" {
t.Fatal("link not added")
}
// on_create can't delete a field using a zero value
// due to zero values preserving prior field values
onCreate([]string{"jq", "-c", `del(.link)`})
items = execute([]string{"jq", "-cn", `{id: "four", link: "gopher://go.dev"}`})
if items[0].Link != "gopher://go.dev" {
t.Fatal("missing link")
}
add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil {
t.Fatal("failed update with attempted deletion oncreate")
}
if getItem("four").Link != "gopher://go.dev" {
t.Fatal("link unexpectedly removed")
}
// item is created if on_create fails
onCreate([]string{"false"})
items = execute([]string{"jq", "-cn", `{id: "five"}`})
add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil {
t.Fatal("failed update with failing oncreate")
}
if getItem("five").Id != "five" {
t.Fatal("item not created")
}
// item isn't updated if on_create has valid output but a bad exit code
onCreate([]string{"sh", "-c", `jq -cn '{id: "six", title: "after"}'; exit 1`})
items = execute([]string{"jq", "-cn", `{id: "six", title: "before"}`})
if items[0].Title != "before" {
t.Fatal("unexpected title")
}
add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil {
t.Fatal("failed update with bad exit code oncreate")
}
if getItem("six").Title != "before" {
t.Fatal("update applied after oncreate failed")
}
// on_create can't change id, active, or created
onCreate([]string{"jq", "-c", `.id = "seven"; .active = false; .created = 123456`})
items = execute([]string{"jq", "-cn", `{id: "seven"}`})
add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil {
t.Fatal("failed update with invalid field changes oncreate")
}
updated = getItem("seven")
if updated.Id != "seven" || !updated.Active || updated.Created == 123456 {
t.Fatal("unexpected changes to id, active, or created fields")
}
}
func TestSourceState(t *testing.T) {
db := EphemeralDb(t)
if err := AddSource(db, "s"); err != nil {
t.Fatal(err)
}
state, err := GetState(db, "s")
if err != nil {
t.Fatal(err)
}
if len(state) != 0 {
t.Fatal("expected no state on a fresh source")
}
if err = SetState(db, "s", []byte("hello, world")); err != nil {
t.Fatal(err)
}
state, err = GetState(db, "s")
if err != nil {
t.Fatal(err)
}
if string(state) != "hello, world" {
t.Fatalf("expected hello, world, got %s", state)
}
}