Pass state directly through Execute instead of statePath

This commit is contained in:
Tim Van Baak 2025-01-30 16:04:31 -08:00
parent 5692f60318
commit 9cfa568eff
8 changed files with 124 additions and 47 deletions

View File

@ -3,6 +3,7 @@ package cmd
import ( import (
"fmt" "fmt"
"log" "log"
"slices"
"time" "time"
"github.com/Jaculabilis/intake/core" "github.com/Jaculabilis/intake/core"
@ -80,6 +81,11 @@ func actionExecute(
db := openAndMigrateDb() db := openAndMigrateDb()
state, err := core.GetState(db, source)
if err != nil {
log.Fatalf("error: failed to load state for %s: %v", source, err)
}
item, err := core.GetItem(db, source, itemId) item, err := core.GetItem(db, source, itemId)
if err != nil { if err != nil {
log.Fatalf("error: failed to get item: %v", err) log.Fatalf("error: failed to get item: %v", err)
@ -98,7 +104,7 @@ func actionExecute(
log.Fatalf("error: failed to get action: %v", err) log.Fatalf("error: failed to get action: %v", err)
} }
newItem, err := core.ExecuteItemAction(item, argv, nil, "", time.Minute) newItem, newState, err := core.ExecuteItemAction(item, argv, nil, state, time.Minute)
if err != nil { if err != nil {
log.Fatalf("error executing %s: %v", action, err) log.Fatalf("error executing %s: %v", action, err)
} }
@ -122,6 +128,9 @@ func actionExecute(
if core.ItemsAreEqual(item, newItem) { if core.ItemsAreEqual(item, newItem) {
log.Printf("no changes\n") log.Printf("no changes\n")
} }
if !slices.Equal(state, newState) {
log.Printf("state changed (%d => %d bytes)", len(state), len(newState))
}
} }
if dryRun { if dryRun {
@ -132,4 +141,8 @@ func actionExecute(
if err = core.UpdateItems(db, []core.Item{newItem}); err != nil { if err = core.UpdateItems(db, []core.Item{newItem}); err != nil {
log.Fatalf("error: failed to update item: %v", err) log.Fatalf("error: failed to update item: %v", err)
} }
if err = core.SetState(db, source, newState); err != nil {
log.Fatalf("error: failed to set state for %s: %v", source, err)
}
} }

View File

@ -3,6 +3,7 @@ package cmd
import ( import (
"fmt" "fmt"
"log" "log"
"slices"
"time" "time"
"github.com/Jaculabilis/intake/core" "github.com/Jaculabilis/intake/core"
@ -42,25 +43,33 @@ func sourceFetch(source string, format string, dryRun bool) {
db := openAndMigrateDb() db := openAndMigrateDb()
state, err := core.GetState(db, source)
if err != nil {
log.Fatalf("error: failed to load state for %s: %v", source, err)
}
argv, err := core.GetArgvForAction(db, source, "fetch") argv, err := core.GetArgvForAction(db, source, "fetch")
if err != nil { if err != nil {
log.Fatalf("error: failed to get fetch action: %v", err) log.Fatalf("error: failed to get fetch action: %v", err)
} }
items, err := core.Execute(source, argv, nil, "", "", time.Minute) items, newState, err := core.Execute(source, argv, nil, state, "", time.Minute)
if err != nil { if err != nil {
log.Fatalf("error: failed to execute fetch: %v", err) log.Fatalf("error: failed to execute fetch: %v", err)
} }
if dryRun { if dryRun {
log.Printf("Fetch returned %d items", len(items)) log.Printf("Fetch returned %d items", len(items))
if !slices.Equal(state, newState) {
log.Printf("State update (%d => %d bytes)", len(state), len(newState))
}
for _, item := range items { for _, item := range items {
fmt.Println(formatter(item)) fmt.Println(formatter(item))
} }
return return
} }
added, deleted, err := core.UpdateWithFetchedItems(db, source, items) added, deleted, err := core.UpdateWithFetchedItems(db, source, newState, items)
if err != nil { if err != nil {
log.Fatalf("error: failed to update: %v", err) log.Fatalf("error: failed to update: %v", err)
} }

View File

@ -30,8 +30,9 @@ func init() {
func sourceTest(env []string, format string, cmd []string) { func sourceTest(env []string, format string, cmd []string) {
formatter := formatAs(format) formatter := formatAs(format)
items, err := core.Execute("test", cmd, env, "", "", time.Minute) items, state, err := core.Execute("test", cmd, env, nil, "", time.Minute)
log.Printf("Returned %d items", len(items)) log.Printf("returned %d items", len(items))
log.Printf("wrote %d bytes of state", len(state))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }

View File

@ -127,20 +127,35 @@ func Execute(
source string, source string,
argv []string, argv []string,
env []string, env []string,
statePath string, state []byte,
input string, input string,
timeout time.Duration, timeout time.Duration,
) ([]Item, error) { ) ([]Item, []byte, error) {
log.Printf("executing %v", argv) log.Printf("executing %v", argv)
if len(argv) == 0 { if len(argv) == 0 {
return nil, errors.New("empty argv") return nil, nil, errors.New("empty argv")
} }
if source == "" { if source == "" {
return nil, errors.New("empty source") return nil, nil, errors.New("empty source")
} }
env = append(env, "STATE_PATH="+statePath) stateFile, err := os.CreateTemp("", "intake_state_*")
if err != nil {
return nil, nil, fmt.Errorf("error: failed to create temp state file: %v", err)
}
defer func() {
if err := os.Remove(stateFile.Name()); err != nil {
log.Printf("error: failed to delete %s", stateFile.Name())
}
}()
_, err = stateFile.Write(state)
if err != nil {
return nil, nil, fmt.Errorf("error: failed to write state file: %v", err)
}
env = append(env, "STATE_PATH="+stateFile.Name())
ctx, cancel := context.WithTimeout(context.Background(), timeout) ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel() defer cancel()
@ -151,15 +166,15 @@ func Execute(
// Open pipes to the command // Open pipes to the command
stdin, err := cmd.StdinPipe() stdin, err := cmd.StdinPipe()
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
stdout, err := cmd.StdoutPipe() stdout, err := cmd.StdoutPipe()
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
stderr, err := cmd.StderrPipe() stderr, err := cmd.StderrPipe()
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
cout := make(chan Item) cout := make(chan Item)
@ -182,7 +197,7 @@ func Execute(
// Kick off the command // Kick off the command
err = cmd.Start() err = cmd.Start()
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
// Block until std{out,err} close // Block until std{out,err} close
@ -192,45 +207,51 @@ func Execute(
err = cmd.Wait() err = cmd.Wait()
if ctx.Err() == context.DeadlineExceeded { if ctx.Err() == context.DeadlineExceeded {
log.Printf("Timed out after %v\n", timeout) log.Printf("Timed out after %v\n", timeout)
return nil, err return nil, nil, err
} else if exiterr, ok := err.(*exec.ExitError); ok { } else if exiterr, ok := err.(*exec.ExitError); ok {
log.Printf("error: %s failed with exit code %d\n", argv[0], exiterr.ExitCode()) log.Printf("error: %s failed with exit code %d\n", argv[0], exiterr.ExitCode())
return nil, err return nil, nil, err
} else if err != nil { } else if err != nil {
log.Printf("error: %s failed with error: %s\n", argv[0], err) log.Printf("error: %s failed with error: %s\n", argv[0], err)
return nil, err return nil, nil, err
} }
if parseError { if parseError {
log.Printf("error: could not parse item\n") log.Printf("error: could not parse item\n")
return nil, errors.New("invalid JSON") return nil, nil, errors.New("invalid JSON")
} }
return items, nil newState, err := os.ReadFile(stateFile.Name())
if err != nil {
return nil, nil, fmt.Errorf("error: failed to read state file: %v", err)
}
return items, newState, nil
} }
// Execute an action that takes an item as input and returns the item modified. // Execute an action that takes an item as input and returns the item modified.
// This is basically just a wrapper over [Execute] that handles the input and backfilling.
func ExecuteItemAction( func ExecuteItemAction(
item Item, item Item,
argv []string, argv []string,
env []string, env []string,
statePath string, state []byte,
timeout time.Duration, timeout time.Duration,
) (Item, error) { ) (Item, []byte, error) {
itemJson, err := json.Marshal(item) itemJson, err := json.Marshal(item)
if err != nil { if err != nil {
return Item{}, fmt.Errorf("failed to serialize item: %v", err) return Item{}, nil, fmt.Errorf("failed to serialize item: %v", err)
} }
res, err := Execute(item.Source, argv, env, statePath, string(itemJson), timeout) res, newState, err := Execute(item.Source, argv, env, state, string(itemJson), timeout)
if err != nil { if err != nil {
return Item{}, fmt.Errorf("failed to execute action for %s/%s: %v", item.Source, item.Id, err) return Item{}, nil, fmt.Errorf("failed to execute action for %s/%s: %v", item.Source, item.Id, err)
} }
if len(res) != 1 { if len(res) != 1 {
return Item{}, fmt.Errorf("expected action to produce exactly one item, got %d", len(res)) return Item{}, nil, fmt.Errorf("expected action to produce exactly one item, got %d", len(res))
} }
newItem := res[0] newItem := res[0]
BackfillItem(&newItem, &item) BackfillItem(&newItem, &item)
return newItem, nil return newItem, newState, nil
} }

View File

@ -72,7 +72,8 @@ func TestExecute(t *testing.T) {
} }
} }
execute := func(argv []string) ([]Item, error) { execute := func(argv []string) ([]Item, error) {
return Execute("_", argv, nil, "", "", time.Minute) item, _, err := Execute("_", argv, nil, nil, "", time.Minute)
return item, err
} }
res, err := execute([]string{"true"}) res, err := execute([]string{"true"})
@ -89,7 +90,7 @@ func TestExecute(t *testing.T) {
assertLen(res, 0) assertLen(res, 0)
// Timeout // Timeout
res, err = Execute("_", []string{"sleep", "10"}, nil, "", "", time.Millisecond) res, _, err = Execute("_", []string{"sleep", "10"}, nil, nil, "", time.Millisecond)
assertNotNil(err) assertNotNil(err)
assertLen(res, 0) assertLen(res, 0)
@ -102,7 +103,7 @@ func TestExecute(t *testing.T) {
} }
// Read from stdin // Read from stdin
res, err = Execute("_", []string{"jq", "-cR", `{id: .}`}, nil, "", "bar", time.Minute) res, _, err = Execute("_", []string{"jq", "-cR", `{id: .}`}, nil, nil, "bar", time.Minute)
assertNil(err) assertNil(err)
assertLen(res, 1) assertLen(res, 1)
if res[0].Id != "bar" { if res[0].Id != "bar" {
@ -110,7 +111,7 @@ func TestExecute(t *testing.T) {
} }
// Set env // Set env
res, err = Execute("_", []string{"jq", "-cn", `{id: env.HELLO}`}, []string{"HELLO=baz"}, "", "", time.Minute) res, _, err = Execute("_", []string{"jq", "-cn", `{id: env.HELLO}`}, []string{"HELLO=baz"}, nil, "", time.Minute)
assertNil(err) assertNil(err)
assertLen(res, 1) assertLen(res, 1)
if res[0].Id != "baz" { if res[0].Id != "baz" {
@ -179,4 +180,22 @@ func TestExecute(t *testing.T) {
if res[0].Action["hello"] == nil { if res[0].Action["hello"] == nil {
t.Fatal("missing hello action") t.Fatal("missing hello action")
} }
// Read state
argv := []string{"sh", "-c", `cat $STATE_PATH | jq -cR '{id: "greeting", title: .} | .title = "Hello " + .title'`}
res, _, err = Execute("_", argv, nil, []byte("world"), "", time.Minute)
assertNil(err)
assertLen(res, 1)
if res[0].Title != "Hello world" {
t.Fatalf("expected 'Hello world' from read state, got '%s'", res[0].Title)
}
// Write state
argv = []string{"sh", "-c", `printf "Hello world" > $STATE_PATH; jq -cn '{id: "test"}'`}
res, newState, err := Execute("_", argv, nil, nil, "", time.Minute)
assertNil(err)
assertLen(res, 1)
if string(newState) != "Hello world" {
t.Fatalf("expected 'Hello world' from write state, got %s", string(newState))
}
} }

View File

@ -253,7 +253,7 @@ func SetState(db *DB, source string, state []byte) error {
// Given the results of a fetch, add new items, update existing items, and delete expired items. // Given the results of a fetch, add new items, update existing items, and delete expired items.
// //
// Returns the number of new and deleted items on success. // Returns the number of new and deleted items on success.
func UpdateWithFetchedItems(db *DB, source string, items []Item) (int, int, error) { func UpdateWithFetchedItems(db *DB, source string, state []byte, items []Item) (int, int, error) {
// Get the existing items // Get the existing items
existingItems, err := GetAllItemsForSource(db, source) existingItems, err := GetAllItemsForSource(db, source)
if err != nil { if err != nil {
@ -296,7 +296,8 @@ func UpdateWithFetchedItems(db *DB, source string, items []Item) (int, int, erro
if err == nil { if err == nil {
var updatedNewItems []Item var updatedNewItems []Item
for _, item := range newItems { for _, item := range newItems {
updatedItem, err := ExecuteItemAction(item, onCreateArgv, nil, "", time.Minute) var updatedItem Item
updatedItem, state, err = ExecuteItemAction(item, onCreateArgv, nil, state, time.Minute)
if err != nil { if err != nil {
log.Printf("error: on_create failed for %s/%s: %v", item.Source, item.Id, err) log.Printf("error: on_create failed for %s/%s: %v", item.Source, item.Id, err)
} }
@ -330,5 +331,9 @@ func UpdateWithFetchedItems(db *DB, source string, items []Item) (int, int, erro
} }
} }
if err = SetState(db, source, state); err != nil {
return 0, 0, err
}
return len(newItems), len(idsToDelete), nil return len(newItems), len(idsToDelete), nil
} }

View File

@ -128,18 +128,18 @@ func TestUpdateSourceAddAndDelete(t *testing.T) {
} }
a := Item{Source: "test", Id: "a"} a := Item{Source: "test", Id: "a"}
add, del, err := UpdateWithFetchedItems(db, "test", []Item{a}) add, del, err := UpdateWithFetchedItems(db, "test", nil, []Item{a})
if add != 1 || del != 0 || err != nil { if add != 1 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
} }
add, del, err = UpdateWithFetchedItems(db, "test", []Item{a}) add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a})
if add != 0 || del != 0 || err != nil { if add != 0 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
} }
b := Item{Source: "test", Id: "b"} b := Item{Source: "test", Id: "b"}
add, del, err = UpdateWithFetchedItems(db, "test", []Item{a, b}) add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a, b})
if add != 1 || del != 0 || err != nil { if add != 1 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
} }
@ -147,17 +147,17 @@ func TestUpdateSourceAddAndDelete(t *testing.T) {
if _, err = DeactivateItem(db, "test", "a"); err != nil { if _, err = DeactivateItem(db, "test", "a"); err != nil {
t.Fatal(err) t.Fatal(err)
} }
add, del, err = UpdateWithFetchedItems(db, "test", []Item{a, b}) add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a, b})
if add != 0 || del != 0 || err != nil { if add != 0 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
} }
add, del, err = UpdateWithFetchedItems(db, "test", []Item{b}) add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{b})
if add != 0 || del != 1 || err != nil { if add != 0 || del != 1 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
} }
add, del, err = UpdateWithFetchedItems(db, "test", []Item{b}) add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{b})
if add != 0 || del != 0 || err != nil { if add != 0 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
} }
@ -173,7 +173,7 @@ func TestOnCreateAction(t *testing.T) {
} }
execute := func(argv []string) []Item { execute := func(argv []string) []Item {
items, err := Execute("test", argv, nil, "", "", time.Minute) items, _, err := Execute("test", argv, nil, nil, "", time.Minute)
if err != nil { if err != nil {
t.Fatal("unexpected error executing test fetch") t.Fatal("unexpected error executing test fetch")
} }
@ -200,7 +200,7 @@ func TestOnCreateAction(t *testing.T) {
// Noop on_create works // Noop on_create works
onCreate([]string{"tee"}) onCreate([]string{"tee"})
items := execute([]string{"jq", "-cn", `{id: "one"}`}) items := execute([]string{"jq", "-cn", `{id: "one"}`})
add, _, err := UpdateWithFetchedItems(db, "test", items) add, _, err := UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil { if add != 1 || err != nil {
t.Fatal("failed update with noop oncreate") t.Fatal("failed update with noop oncreate")
} }
@ -216,7 +216,7 @@ func TestOnCreateAction(t *testing.T) {
if items[0].Title != "Hello, World" { if items[0].Title != "Hello, World" {
t.Fatal("unexpected title") t.Fatal("unexpected title")
} }
add, _, err = UpdateWithFetchedItems(db, "test", items) add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil { if add != 1 || err != nil {
t.Fatal("failed update with alter oncreate") t.Fatal("failed update with alter oncreate")
} }
@ -231,7 +231,7 @@ func TestOnCreateAction(t *testing.T) {
if items[0].Link != "" { if items[0].Link != "" {
t.Fatal("unexpected link") t.Fatal("unexpected link")
} }
add, _, err = UpdateWithFetchedItems(db, "test", items) add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil { if add != 1 || err != nil {
t.Fatal("failed update with augment oncreate") t.Fatal("failed update with augment oncreate")
} }
@ -246,7 +246,7 @@ func TestOnCreateAction(t *testing.T) {
if items[0].Link != "gopher://go.dev" { if items[0].Link != "gopher://go.dev" {
t.Fatal("missing link") t.Fatal("missing link")
} }
add, _, err = UpdateWithFetchedItems(db, "test", items) add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil { if add != 1 || err != nil {
t.Fatal("failed update with attempted deletion oncreate") t.Fatal("failed update with attempted deletion oncreate")
} }
@ -257,7 +257,7 @@ func TestOnCreateAction(t *testing.T) {
// item is created if on_create fails // item is created if on_create fails
onCreate([]string{"false"}) onCreate([]string{"false"})
items = execute([]string{"jq", "-cn", `{id: "five"}`}) items = execute([]string{"jq", "-cn", `{id: "five"}`})
add, _, err = UpdateWithFetchedItems(db, "test", items) add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil { if add != 1 || err != nil {
t.Fatal("failed update with failing oncreate") t.Fatal("failed update with failing oncreate")
} }
@ -271,7 +271,7 @@ func TestOnCreateAction(t *testing.T) {
if items[0].Title != "before" { if items[0].Title != "before" {
t.Fatal("unexpected title") t.Fatal("unexpected title")
} }
add, _, err = UpdateWithFetchedItems(db, "test", items) add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil { if add != 1 || err != nil {
t.Fatal("failed update with bad exit code oncreate") t.Fatal("failed update with bad exit code oncreate")
} }
@ -282,7 +282,7 @@ func TestOnCreateAction(t *testing.T) {
// on_create can't change id, active, or created // on_create can't change id, active, or created
onCreate([]string{"jq", "-c", `.id = "seven"; .active = false; .created = 123456`}) onCreate([]string{"jq", "-c", `.id = "seven"; .active = false; .created = 123456`})
items = execute([]string{"jq", "-cn", `{id: "seven"}`}) items = execute([]string{"jq", "-cn", `{id: "seven"}`})
add, _, err = UpdateWithFetchedItems(db, "test", items) add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
if add != 1 || err != nil { if add != 1 || err != nil {
t.Fatal("failed update with invalid field changes oncreate") t.Fatal("failed update with invalid field changes oncreate")
} }

View File

@ -44,6 +44,11 @@ func (env *Env) doAction(writer http.ResponseWriter, req *http.Request) {
id := req.PathValue("id") id := req.PathValue("id")
action := req.PathValue("action") action := req.PathValue("action")
state, err := core.GetState(env.db, source)
if err != nil {
log.Fatalf("error: failed to load state for %s: %v", source, err)
}
item, err := core.GetItem(env.db, source, id) item, err := core.GetItem(env.db, source, id)
if err != nil { if err != nil {
http.Error(writer, err.Error(), 500) http.Error(writer, err.Error(), 500)
@ -61,7 +66,7 @@ func (env *Env) doAction(writer http.ResponseWriter, req *http.Request) {
return return
} }
newItem, err := core.ExecuteItemAction(item, argv, nil, "", time.Minute) newItem, newState, err := core.ExecuteItemAction(item, argv, nil, state, time.Minute)
if err != nil { if err != nil {
http.Error(writer, err.Error(), 500) http.Error(writer, err.Error(), 500)
return return
@ -72,6 +77,10 @@ func (env *Env) doAction(writer http.ResponseWriter, req *http.Request) {
return return
} }
if err = core.SetState(env.db, source, newState); err != nil {
log.Fatalf("error: failed to set state for %s: %v", source, err)
}
html.Item(writer, html.ItemData{Item: newItem}) html.Item(writer, html.ItemData{Item: newItem})
} }