From 9cfa568eff91d582475d9290cf18a7abe3db6ec9 Mon Sep 17 00:00:00 2001 From: Tim Van Baak Date: Thu, 30 Jan 2025 16:04:31 -0800 Subject: [PATCH] Pass state directly through Execute instead of statePath --- cmd/actionExecute.go | 15 ++++++++++- cmd/sourceFetch.go | 13 +++++++-- cmd/sourceTest.go | 5 ++-- core/action.go | 63 +++++++++++++++++++++++++++++--------------- core/action_test.go | 27 ++++++++++++++++--- core/source.go | 9 +++++-- core/source_test.go | 28 ++++++++++---------- web/item.go | 11 +++++++- 8 files changed, 124 insertions(+), 47 deletions(-) diff --git a/cmd/actionExecute.go b/cmd/actionExecute.go index bfbd1f9..4dd6ef8 100644 --- a/cmd/actionExecute.go +++ b/cmd/actionExecute.go @@ -3,6 +3,7 @@ package cmd import ( "fmt" "log" + "slices" "time" "github.com/Jaculabilis/intake/core" @@ -80,6 +81,11 @@ func actionExecute( db := openAndMigrateDb() + state, err := core.GetState(db, source) + if err != nil { + log.Fatalf("error: failed to load state for %s: %v", source, err) + } + item, err := core.GetItem(db, source, itemId) if err != nil { log.Fatalf("error: failed to get item: %v", err) @@ -98,7 +104,7 @@ func actionExecute( log.Fatalf("error: failed to get action: %v", err) } - newItem, err := core.ExecuteItemAction(item, argv, nil, "", time.Minute) + newItem, newState, err := core.ExecuteItemAction(item, argv, nil, state, time.Minute) if err != nil { log.Fatalf("error executing %s: %v", action, err) } @@ -122,6 +128,9 @@ func actionExecute( if core.ItemsAreEqual(item, newItem) { log.Printf("no changes\n") } + if !slices.Equal(state, newState) { + log.Printf("state changed (%d => %d bytes)", len(state), len(newState)) + } } if dryRun { @@ -132,4 +141,8 @@ func actionExecute( if err = core.UpdateItems(db, []core.Item{newItem}); err != nil { log.Fatalf("error: failed to update item: %v", err) } + + if err = core.SetState(db, source, newState); err != nil { + log.Fatalf("error: failed to set state for %s: %v", source, err) + } } diff --git a/cmd/sourceFetch.go b/cmd/sourceFetch.go index 395f3e5..567bdf6 100644 --- a/cmd/sourceFetch.go +++ b/cmd/sourceFetch.go @@ -3,6 +3,7 @@ package cmd import ( "fmt" "log" + "slices" "time" "github.com/Jaculabilis/intake/core" @@ -42,25 +43,33 @@ func sourceFetch(source string, format string, dryRun bool) { db := openAndMigrateDb() + state, err := core.GetState(db, source) + if err != nil { + log.Fatalf("error: failed to load state for %s: %v", source, err) + } + argv, err := core.GetArgvForAction(db, source, "fetch") if err != nil { log.Fatalf("error: failed to get fetch action: %v", err) } - items, err := core.Execute(source, argv, nil, "", "", time.Minute) + items, newState, err := core.Execute(source, argv, nil, state, "", time.Minute) if err != nil { log.Fatalf("error: failed to execute fetch: %v", err) } if dryRun { log.Printf("Fetch returned %d items", len(items)) + if !slices.Equal(state, newState) { + log.Printf("State update (%d => %d bytes)", len(state), len(newState)) + } for _, item := range items { fmt.Println(formatter(item)) } return } - added, deleted, err := core.UpdateWithFetchedItems(db, source, items) + added, deleted, err := core.UpdateWithFetchedItems(db, source, newState, items) if err != nil { log.Fatalf("error: failed to update: %v", err) } diff --git a/cmd/sourceTest.go b/cmd/sourceTest.go index ca81a59..62a47c4 100644 --- a/cmd/sourceTest.go +++ b/cmd/sourceTest.go @@ -30,8 +30,9 @@ func init() { func sourceTest(env []string, format string, cmd []string) { formatter := formatAs(format) - items, err := core.Execute("test", cmd, env, "", "", time.Minute) - log.Printf("Returned %d items", len(items)) + items, state, err := core.Execute("test", cmd, env, nil, "", time.Minute) + log.Printf("returned %d items", len(items)) + log.Printf("wrote %d bytes of state", len(state)) if err != nil { log.Fatal(err) } diff --git a/core/action.go b/core/action.go index 25b537d..c33a5ba 100644 --- a/core/action.go +++ b/core/action.go @@ -127,20 +127,35 @@ func Execute( source string, argv []string, env []string, - statePath string, + state []byte, input string, timeout time.Duration, -) ([]Item, error) { +) ([]Item, []byte, error) { log.Printf("executing %v", argv) if len(argv) == 0 { - return nil, errors.New("empty argv") + return nil, nil, errors.New("empty argv") } if source == "" { - return nil, errors.New("empty source") + return nil, nil, errors.New("empty source") } - env = append(env, "STATE_PATH="+statePath) + stateFile, err := os.CreateTemp("", "intake_state_*") + if err != nil { + return nil, nil, fmt.Errorf("error: failed to create temp state file: %v", err) + } + defer func() { + if err := os.Remove(stateFile.Name()); err != nil { + log.Printf("error: failed to delete %s", stateFile.Name()) + } + }() + + _, err = stateFile.Write(state) + if err != nil { + return nil, nil, fmt.Errorf("error: failed to write state file: %v", err) + } + + env = append(env, "STATE_PATH="+stateFile.Name()) ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() @@ -151,15 +166,15 @@ func Execute( // Open pipes to the command stdin, err := cmd.StdinPipe() if err != nil { - return nil, err + return nil, nil, err } stdout, err := cmd.StdoutPipe() if err != nil { - return nil, err + return nil, nil, err } stderr, err := cmd.StderrPipe() if err != nil { - return nil, err + return nil, nil, err } cout := make(chan Item) @@ -182,7 +197,7 @@ func Execute( // Kick off the command err = cmd.Start() if err != nil { - return nil, err + return nil, nil, err } // Block until std{out,err} close @@ -192,45 +207,51 @@ func Execute( err = cmd.Wait() if ctx.Err() == context.DeadlineExceeded { log.Printf("Timed out after %v\n", timeout) - return nil, err + return nil, nil, err } else if exiterr, ok := err.(*exec.ExitError); ok { log.Printf("error: %s failed with exit code %d\n", argv[0], exiterr.ExitCode()) - return nil, err + return nil, nil, err } else if err != nil { log.Printf("error: %s failed with error: %s\n", argv[0], err) - return nil, err + return nil, nil, err } if parseError { log.Printf("error: could not parse item\n") - return nil, errors.New("invalid JSON") + return nil, nil, errors.New("invalid JSON") } - return items, nil + newState, err := os.ReadFile(stateFile.Name()) + if err != nil { + return nil, nil, fmt.Errorf("error: failed to read state file: %v", err) + } + + return items, newState, nil } // Execute an action that takes an item as input and returns the item modified. +// This is basically just a wrapper over [Execute] that handles the input and backfilling. func ExecuteItemAction( item Item, argv []string, env []string, - statePath string, + state []byte, timeout time.Duration, -) (Item, error) { +) (Item, []byte, error) { itemJson, err := json.Marshal(item) if err != nil { - return Item{}, fmt.Errorf("failed to serialize item: %v", err) + return Item{}, nil, fmt.Errorf("failed to serialize item: %v", err) } - res, err := Execute(item.Source, argv, env, statePath, string(itemJson), timeout) + res, newState, err := Execute(item.Source, argv, env, state, string(itemJson), timeout) if err != nil { - return Item{}, fmt.Errorf("failed to execute action for %s/%s: %v", item.Source, item.Id, err) + return Item{}, nil, fmt.Errorf("failed to execute action for %s/%s: %v", item.Source, item.Id, err) } if len(res) != 1 { - return Item{}, fmt.Errorf("expected action to produce exactly one item, got %d", len(res)) + return Item{}, nil, fmt.Errorf("expected action to produce exactly one item, got %d", len(res)) } newItem := res[0] BackfillItem(&newItem, &item) - return newItem, nil + return newItem, newState, nil } diff --git a/core/action_test.go b/core/action_test.go index 6e96124..9a33f04 100644 --- a/core/action_test.go +++ b/core/action_test.go @@ -72,7 +72,8 @@ func TestExecute(t *testing.T) { } } execute := func(argv []string) ([]Item, error) { - return Execute("_", argv, nil, "", "", time.Minute) + item, _, err := Execute("_", argv, nil, nil, "", time.Minute) + return item, err } res, err := execute([]string{"true"}) @@ -89,7 +90,7 @@ func TestExecute(t *testing.T) { assertLen(res, 0) // Timeout - res, err = Execute("_", []string{"sleep", "10"}, nil, "", "", time.Millisecond) + res, _, err = Execute("_", []string{"sleep", "10"}, nil, nil, "", time.Millisecond) assertNotNil(err) assertLen(res, 0) @@ -102,7 +103,7 @@ func TestExecute(t *testing.T) { } // Read from stdin - res, err = Execute("_", []string{"jq", "-cR", `{id: .}`}, nil, "", "bar", time.Minute) + res, _, err = Execute("_", []string{"jq", "-cR", `{id: .}`}, nil, nil, "bar", time.Minute) assertNil(err) assertLen(res, 1) if res[0].Id != "bar" { @@ -110,7 +111,7 @@ func TestExecute(t *testing.T) { } // Set env - res, err = Execute("_", []string{"jq", "-cn", `{id: env.HELLO}`}, []string{"HELLO=baz"}, "", "", time.Minute) + res, _, err = Execute("_", []string{"jq", "-cn", `{id: env.HELLO}`}, []string{"HELLO=baz"}, nil, "", time.Minute) assertNil(err) assertLen(res, 1) if res[0].Id != "baz" { @@ -179,4 +180,22 @@ func TestExecute(t *testing.T) { if res[0].Action["hello"] == nil { t.Fatal("missing hello action") } + + // Read state + argv := []string{"sh", "-c", `cat $STATE_PATH | jq -cR '{id: "greeting", title: .} | .title = "Hello " + .title'`} + res, _, err = Execute("_", argv, nil, []byte("world"), "", time.Minute) + assertNil(err) + assertLen(res, 1) + if res[0].Title != "Hello world" { + t.Fatalf("expected 'Hello world' from read state, got '%s'", res[0].Title) + } + + // Write state + argv = []string{"sh", "-c", `printf "Hello world" > $STATE_PATH; jq -cn '{id: "test"}'`} + res, newState, err := Execute("_", argv, nil, nil, "", time.Minute) + assertNil(err) + assertLen(res, 1) + if string(newState) != "Hello world" { + t.Fatalf("expected 'Hello world' from write state, got %s", string(newState)) + } } diff --git a/core/source.go b/core/source.go index d7521f7..de97c99 100644 --- a/core/source.go +++ b/core/source.go @@ -253,7 +253,7 @@ func SetState(db *DB, source string, state []byte) error { // Given the results of a fetch, add new items, update existing items, and delete expired items. // // Returns the number of new and deleted items on success. -func UpdateWithFetchedItems(db *DB, source string, items []Item) (int, int, error) { +func UpdateWithFetchedItems(db *DB, source string, state []byte, items []Item) (int, int, error) { // Get the existing items existingItems, err := GetAllItemsForSource(db, source) if err != nil { @@ -296,7 +296,8 @@ func UpdateWithFetchedItems(db *DB, source string, items []Item) (int, int, erro if err == nil { var updatedNewItems []Item for _, item := range newItems { - updatedItem, err := ExecuteItemAction(item, onCreateArgv, nil, "", time.Minute) + var updatedItem Item + updatedItem, state, err = ExecuteItemAction(item, onCreateArgv, nil, state, time.Minute) if err != nil { log.Printf("error: on_create failed for %s/%s: %v", item.Source, item.Id, err) } @@ -330,5 +331,9 @@ func UpdateWithFetchedItems(db *DB, source string, items []Item) (int, int, erro } } + if err = SetState(db, source, state); err != nil { + return 0, 0, err + } + return len(newItems), len(idsToDelete), nil } diff --git a/core/source_test.go b/core/source_test.go index 3aae051..053f1d3 100644 --- a/core/source_test.go +++ b/core/source_test.go @@ -128,18 +128,18 @@ func TestUpdateSourceAddAndDelete(t *testing.T) { } a := Item{Source: "test", Id: "a"} - add, del, err := UpdateWithFetchedItems(db, "test", []Item{a}) + add, del, err := UpdateWithFetchedItems(db, "test", nil, []Item{a}) if add != 1 || del != 0 || err != nil { t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) } - add, del, err = UpdateWithFetchedItems(db, "test", []Item{a}) + add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a}) if add != 0 || del != 0 || err != nil { t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) } b := Item{Source: "test", Id: "b"} - add, del, err = UpdateWithFetchedItems(db, "test", []Item{a, b}) + add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a, b}) if add != 1 || del != 0 || err != nil { t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) } @@ -147,17 +147,17 @@ func TestUpdateSourceAddAndDelete(t *testing.T) { if _, err = DeactivateItem(db, "test", "a"); err != nil { t.Fatal(err) } - add, del, err = UpdateWithFetchedItems(db, "test", []Item{a, b}) + add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a, b}) if add != 0 || del != 0 || err != nil { t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) } - add, del, err = UpdateWithFetchedItems(db, "test", []Item{b}) + add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{b}) if add != 0 || del != 1 || err != nil { t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) } - add, del, err = UpdateWithFetchedItems(db, "test", []Item{b}) + add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{b}) if add != 0 || del != 0 || err != nil { t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) } @@ -173,7 +173,7 @@ func TestOnCreateAction(t *testing.T) { } execute := func(argv []string) []Item { - items, err := Execute("test", argv, nil, "", "", time.Minute) + items, _, err := Execute("test", argv, nil, nil, "", time.Minute) if err != nil { t.Fatal("unexpected error executing test fetch") } @@ -200,7 +200,7 @@ func TestOnCreateAction(t *testing.T) { // Noop on_create works onCreate([]string{"tee"}) items := execute([]string{"jq", "-cn", `{id: "one"}`}) - add, _, err := UpdateWithFetchedItems(db, "test", items) + add, _, err := UpdateWithFetchedItems(db, "test", nil, items) if add != 1 || err != nil { t.Fatal("failed update with noop oncreate") } @@ -216,7 +216,7 @@ func TestOnCreateAction(t *testing.T) { if items[0].Title != "Hello, World" { t.Fatal("unexpected title") } - add, _, err = UpdateWithFetchedItems(db, "test", items) + add, _, err = UpdateWithFetchedItems(db, "test", nil, items) if add != 1 || err != nil { t.Fatal("failed update with alter oncreate") } @@ -231,7 +231,7 @@ func TestOnCreateAction(t *testing.T) { if items[0].Link != "" { t.Fatal("unexpected link") } - add, _, err = UpdateWithFetchedItems(db, "test", items) + add, _, err = UpdateWithFetchedItems(db, "test", nil, items) if add != 1 || err != nil { t.Fatal("failed update with augment oncreate") } @@ -246,7 +246,7 @@ func TestOnCreateAction(t *testing.T) { if items[0].Link != "gopher://go.dev" { t.Fatal("missing link") } - add, _, err = UpdateWithFetchedItems(db, "test", items) + add, _, err = UpdateWithFetchedItems(db, "test", nil, items) if add != 1 || err != nil { t.Fatal("failed update with attempted deletion oncreate") } @@ -257,7 +257,7 @@ func TestOnCreateAction(t *testing.T) { // item is created if on_create fails onCreate([]string{"false"}) items = execute([]string{"jq", "-cn", `{id: "five"}`}) - add, _, err = UpdateWithFetchedItems(db, "test", items) + add, _, err = UpdateWithFetchedItems(db, "test", nil, items) if add != 1 || err != nil { t.Fatal("failed update with failing oncreate") } @@ -271,7 +271,7 @@ func TestOnCreateAction(t *testing.T) { if items[0].Title != "before" { t.Fatal("unexpected title") } - add, _, err = UpdateWithFetchedItems(db, "test", items) + add, _, err = UpdateWithFetchedItems(db, "test", nil, items) if add != 1 || err != nil { t.Fatal("failed update with bad exit code oncreate") } @@ -282,7 +282,7 @@ func TestOnCreateAction(t *testing.T) { // on_create can't change id, active, or created onCreate([]string{"jq", "-c", `.id = "seven"; .active = false; .created = 123456`}) items = execute([]string{"jq", "-cn", `{id: "seven"}`}) - add, _, err = UpdateWithFetchedItems(db, "test", items) + add, _, err = UpdateWithFetchedItems(db, "test", nil, items) if add != 1 || err != nil { t.Fatal("failed update with invalid field changes oncreate") } diff --git a/web/item.go b/web/item.go index 1b44c3f..4a18698 100644 --- a/web/item.go +++ b/web/item.go @@ -44,6 +44,11 @@ func (env *Env) doAction(writer http.ResponseWriter, req *http.Request) { id := req.PathValue("id") action := req.PathValue("action") + state, err := core.GetState(env.db, source) + if err != nil { + log.Fatalf("error: failed to load state for %s: %v", source, err) + } + item, err := core.GetItem(env.db, source, id) if err != nil { http.Error(writer, err.Error(), 500) @@ -61,7 +66,7 @@ func (env *Env) doAction(writer http.ResponseWriter, req *http.Request) { return } - newItem, err := core.ExecuteItemAction(item, argv, nil, "", time.Minute) + newItem, newState, err := core.ExecuteItemAction(item, argv, nil, state, time.Minute) if err != nil { http.Error(writer, err.Error(), 500) return @@ -72,6 +77,10 @@ func (env *Env) doAction(writer http.ResponseWriter, req *http.Request) { return } + if err = core.SetState(env.db, source, newState); err != nil { + log.Fatalf("error: failed to set state for %s: %v", source, err) + } + html.Item(writer, html.ItemData{Item: newItem}) }