From 8d9dcec0387e4f2e436ea1135d69a7335faafc1f Mon Sep 17 00:00:00 2001 From: Tim Van Baak Date: Mon, 10 Feb 2025 08:22:19 -0800 Subject: [PATCH] Add error item on action failure --- README.md | 2 +- cmd/actionExecute.go | 3 +- cmd/sourceFetch.go | 3 +- cmd/sourceTest.go | 2 +- core/error.go | 22 +++++++ core/execute.go | 137 +++++++++++++++++++++++++++++++++---------- core/execute_test.go | 33 ++++++++--- core/source.go | 4 +- core/source_test.go | 2 +- test/test_items.sh | 16 +++-- web/item.go | 3 +- web/source.go | 3 +- 12 files changed, 179 insertions(+), 51 deletions(-) create mode 100644 core/error.go diff --git a/README.md b/README.md index 457fb6e..3868e49 100644 --- a/README.md +++ b/README.md @@ -118,7 +118,7 @@ Future features * [ ] manual item edits, web * [ ] source-level TTS * [ ] metric reporting -* [ ] on action failure, create an error item with logs +* [x] on action failure, create an error item with logs * [ ] items gracefully add new fields and `action` keys * [ ] arbitrary date punt * [ ] sort crontab entries diff --git a/cmd/actionExecute.go b/cmd/actionExecute.go index 880aeb2..807b25b 100644 --- a/cmd/actionExecute.go +++ b/cmd/actionExecute.go @@ -99,8 +99,9 @@ func actionExecute( } } - newItem, newState, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute, postProcess) + newItem, newState, errItem, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute, postProcess) if err != nil { + core.AddErrorItem(db, errItem) log.Fatalf("error executing %s: %v", action, err) } diff --git a/cmd/sourceFetch.go b/cmd/sourceFetch.go index be55ebc..8ba611d 100644 --- a/cmd/sourceFetch.go +++ b/cmd/sourceFetch.go @@ -48,8 +48,9 @@ func sourceFetch(source string, format string, dryRun bool) { log.Fatalf("error: failed to load data for %s: %v", source, err) } - items, newState, err := core.Execute(source, argv, envs, state, "", time.Minute, postProcess) + items, newState, errItem, err := core.Execute(source, argv, envs, state, "", time.Minute, postProcess) if err != nil { + core.AddErrorItem(db, errItem) log.Fatalf("error: failed to execute fetch: %v", err) } diff --git a/cmd/sourceTest.go b/cmd/sourceTest.go index 116d743..50c87f9 100644 --- a/cmd/sourceTest.go +++ b/cmd/sourceTest.go @@ -33,7 +33,7 @@ func init() { func sourceTest(env []string, format string, cmd []string) { formatter := formatAs(format) - items, state, err := core.Execute("test", cmd, env, nil, "", time.Minute, nil) + items, state, _, err := core.Execute("test", cmd, env, nil, "", time.Minute, nil) log.Printf("returned %d items", len(items)) log.Printf("wrote %d bytes of state", len(state)) if err != nil { diff --git a/core/error.go b/core/error.go new file mode 100644 index 0000000..254152e --- /dev/null +++ b/core/error.go @@ -0,0 +1,22 @@ +package core + +import "log" + +func AddErrorItem(db DB, item Item) { + exists, err := SourceExists(db, "default") + if err != nil { + log.Printf("error: could not ensure default source: %v", err) + return + } + if !exists { + if err = AddSource(db, "default"); err != nil { + log.Printf("error: could not add default source: %v", err) + return + } + } + item.Source = "default" + err = AddItems(db, []Item{item}) + if err != nil { + log.Printf("error: could not add error item: %v", err) + } +} diff --git a/core/execute.go b/core/execute.go index 378326b..9696c33 100644 --- a/core/execute.go +++ b/core/execute.go @@ -4,7 +4,6 @@ import ( "bufio" "context" "encoding/json" - "errors" "fmt" "io" "log" @@ -29,6 +28,22 @@ func readPipe( } } +// Two-stage error item helper, one stage to capture the immutable parameters and a second +// to include the variable error and logs. +func makeMakeErrItem(source string, argv []string) func(err error, logs []string) (item Item) { + return func(err error, logs []string) (item Item) { + item.Source = "default" + item.Id = RandomHex(16) + item.Title = fmt.Sprintf("Failed to execute for %s", source) + log := "no logs" + if len(logs) > 0 { + log = strings.Join(logs, "\n") + } + item.Body = fmt.Sprintf("

executing: %#v

error: %s

%s
", argv, err.Error(), log) + return + } +} + func Execute( source string, argv []string, @@ -40,20 +55,30 @@ func Execute( ) ( items []Item, newState []byte, + errItem Item, err error, ) { log.Printf("executing %v", argv) - if len(argv) == 0 { - return nil, nil, errors.New("empty argv") - } + makeErrorItem := makeMakeErrItem(source, argv) + var logs []string + if source == "" { - return nil, nil, errors.New("empty source") + err = fmt.Errorf("empty source") + errItem = makeErrorItem(err, logs) + return + } + if len(argv) == 0 { + err = fmt.Errorf("empty argv for %s", source) + errItem = makeErrorItem(err, logs) + return } stateFile, err := os.CreateTemp("", "intake_state_*") if err != nil { - return nil, nil, fmt.Errorf("error: failed to create temp state file: %v", err) + err = fmt.Errorf("failed to create temp state file: %v", err) + errItem = makeErrorItem(err, logs) + return } defer func() { if err := os.Remove(stateFile.Name()); err != nil { @@ -63,7 +88,9 @@ func Execute( _, err = stateFile.Write(state) if err != nil { - return nil, nil, fmt.Errorf("error: failed to write state file: %v", err) + err = fmt.Errorf("failed to write state file: %v", err) + errItem = makeErrorItem(err, logs) + return } env = append(env, "STATE_PATH="+stateFile.Name()) @@ -77,15 +104,21 @@ func Execute( // Open pipes to the command stdin, err := cmd.StdinPipe() if err != nil { - return nil, nil, err + err = fmt.Errorf("failed to open stdin pipe: %v", err) + errItem = makeErrorItem(err, logs) + return } stdout, err := cmd.StdoutPipe() if err != nil { - return nil, nil, err + err = fmt.Errorf("failed to open stdout pipe: %v", err) + errItem = makeErrorItem(err, logs) + return } stderr, err := cmd.StderrPipe() if err != nil { - return nil, nil, err + err = fmt.Errorf("failed to open stderr pipe: %v", err) + errItem = makeErrorItem(err, logs) + return } cout := make(chan []byte) @@ -100,12 +133,22 @@ func Execute( // Kick off the command err = cmd.Start() if err != nil { - return nil, nil, err + err = fmt.Errorf("failed to start execution: %v", err) + errItem = makeErrorItem(err, logs) + return } // Write any input to stdin and close it - io.WriteString(stdin, input) - stdin.Close() + if _, err = io.WriteString(stdin, input); err != nil { + err = fmt.Errorf("failed to write to stdin: %v", err) + errItem = makeErrorItem(err, logs) + return + } + if err = stdin.Close(); err != nil { + err = fmt.Errorf("failed to close stdin: %v", err) + errItem = makeErrorItem(err, logs) + return + } // Collect outputs until std{out,err} close parseError := false @@ -118,7 +161,9 @@ monitor: var item Item err := json.Unmarshal(data, &item) if err != nil || item.Id == "" { - log.Printf("[%s: stdout] %s", source, strings.TrimSpace(string(data))) + msg := fmt.Sprintf("[%s: stdout] %s", source, strings.TrimSpace(string(data))) + log.Print(msg) + logs = append(logs, msg) parseError = true } else { if postProcess != nil { @@ -127,12 +172,16 @@ monitor: item.Active = true // These fields aren't up to item.Created = 0 // the action to set and item.Source = source // shouldn't be overrideable - log.Printf("[%s: item] %s\n", source, item.Id) + msg := fmt.Sprintf("[%s: item] %s", source, item.Id) + log.Print(msg) + logs = append(logs, msg) items = append(items, item) } case data := <-cerr: - log.Printf("[%s: stderr] %s\n", source, data) + msg := fmt.Sprintf("[%s: stderr] %s", source, strings.TrimSpace(string(data))) + log.Print(msg) + logs = append(logs, msg) case <-coutDone: stdoutDone = true @@ -150,24 +199,35 @@ monitor: err = cmd.Wait() if ctx.Err() == context.DeadlineExceeded { - log.Printf("Timed out after %v\n", timeout) - return nil, nil, err + err = fmt.Errorf("timed out after %v", timeout) + errItem = makeErrorItem(err, logs) + log.Printf("error: %v", err) + return nil, nil, errItem, err } else if exiterr, ok := err.(*exec.ExitError); ok { - log.Printf("error: %s failed with exit code %d\n", argv[0], exiterr.ExitCode()) - return nil, nil, err + err = fmt.Errorf("%s failed with exit code %d: %v", argv[0], exiterr.ExitCode(), exiterr) + errItem = makeErrorItem(err, logs) + log.Printf("error: %v", err) + return nil, nil, errItem, err } else if err != nil { - log.Printf("error: %s failed with error: %s\n", argv[0], err) - return nil, nil, err + err = fmt.Errorf("error: %s failed with error: %s", argv[0], err) + errItem = makeErrorItem(err, logs) + log.Printf("error: %v", err) + return nil, nil, errItem, err } if parseError { - log.Printf("error: could not parse item\n") - return nil, nil, errors.New("invalid JSON") + err = fmt.Errorf("could not parse an item") + errItem = makeErrorItem(err, logs) + log.Printf("error: %v", err) + return nil, nil, errItem, err } newState, err = os.ReadFile(stateFile.Name()) if err != nil { - return nil, nil, fmt.Errorf("error: failed to read state file: %v", err) + err = fmt.Errorf("error: failed to read state file: %v", err) + errItem = makeErrorItem(err, logs) + log.Printf("error: %v", err) + return nil, nil, errItem, err } return @@ -182,21 +242,34 @@ func ExecuteItemAction( state []byte, timeout time.Duration, postProcess func(item Item) Item, -) (Item, []byte, error) { +) ( + newItem Item, + newState []byte, + errItem Item, + err error, +) { + makeErrorItem := makeMakeErrItem(item.Source, argv) + itemJson, err := json.Marshal(item) if err != nil { - return Item{}, nil, fmt.Errorf("failed to serialize item: %v", err) + err = fmt.Errorf("failed to serialize item: %v", err) + errItem = makeErrorItem(err, nil) + return } - res, newState, err := Execute(item.Source, argv, env, state, string(itemJson), timeout, postProcess) + res, newState, errItem, err := Execute(item.Source, argv, env, state, string(itemJson), timeout, postProcess) if err != nil { - return Item{}, nil, fmt.Errorf("failed to execute action for %s/%s: %v", item.Source, item.Id, err) + err = fmt.Errorf("failed to execute action for %s/%s: %v", item.Source, item.Id, err) + errItem = makeErrorItem(err, nil) + return } if len(res) != 1 { - return Item{}, nil, fmt.Errorf("expected action to produce exactly one item, got %d", len(res)) + err = fmt.Errorf("expected action to produce exactly one item, got %d", len(res)) + errItem = makeErrorItem(err, nil) + return } - newItem := res[0] + newItem = res[0] BackfillItem(&newItem, &item) - return newItem, newState, nil + return } diff --git a/core/execute_test.go b/core/execute_test.go index 3bee29c..c6a8bfc 100644 --- a/core/execute_test.go +++ b/core/execute_test.go @@ -1,6 +1,7 @@ package core import ( + "strings" "testing" "time" ) @@ -25,7 +26,7 @@ func TestExecute(t *testing.T) { } } execute := func(argv []string) ([]Item, error) { - item, _, err := Execute("_", argv, nil, nil, "", time.Minute, nil) + item, _, _, err := Execute("_", argv, nil, nil, "", time.Minute, nil) return item, err } @@ -48,7 +49,7 @@ func TestExecute(t *testing.T) { }) t.Run("Timeout", func(t *testing.T) { - res, _, err := Execute("_", []string{"sleep", "10"}, nil, nil, "", time.Millisecond, nil) + res, _, _, err := Execute("_", []string{"sleep", "10"}, nil, nil, "", time.Millisecond, nil) assertNotNil(t, err) assertLen(t, res, 0) }) @@ -63,7 +64,7 @@ func TestExecute(t *testing.T) { }) t.Run("ReadFromStdin", func(t *testing.T) { - res, _, err := Execute("_", []string{"jq", "-cR", `{id: .}`}, nil, nil, "bar", time.Minute, nil) + res, _, _, err := Execute("_", []string{"jq", "-cR", `{id: .}`}, nil, nil, "bar", time.Minute, nil) assertNil(t, err) assertLen(t, res, 1) if res[0].Id != "bar" { @@ -72,7 +73,7 @@ func TestExecute(t *testing.T) { }) t.Run("SetEnv", func(t *testing.T) { - res, _, err := Execute("_", []string{"jq", "-cn", `{id: env.HELLO}`}, []string{"HELLO=baz"}, nil, "", time.Minute, nil) + res, _, _, err := Execute("_", []string{"jq", "-cn", `{id: env.HELLO}`}, []string{"HELLO=baz"}, nil, "", time.Minute, nil) assertNil(t, err) assertLen(t, res, 1) if res[0].Id != "baz" { @@ -160,7 +161,7 @@ func TestExecute(t *testing.T) { t.Run("ReadState", func(t *testing.T) { argv := []string{"sh", "-c", `cat $STATE_PATH | jq -cR '{id: "greeting", title: .} | .title = "Hello " + .title'`} - res, _, err := Execute("_", argv, nil, []byte("world"), "", time.Minute, nil) + res, _, _, err := Execute("_", argv, nil, []byte("world"), "", time.Minute, nil) assertNil(t, err) assertLen(t, res, 1) if res[0].Title != "Hello world" { @@ -170,7 +171,7 @@ func TestExecute(t *testing.T) { t.Run("WriteState", func(t *testing.T) { argv := []string{"sh", "-c", `printf "Hello world" > $STATE_PATH; jq -cn '{id: "test"}'`} - res, newState, err := Execute("_", argv, nil, nil, "", time.Minute, nil) + res, newState, _, err := Execute("_", argv, nil, nil, "", time.Minute, nil) assertNil(t, err) assertLen(t, res, 1) if string(newState) != "Hello world" { @@ -180,7 +181,7 @@ func TestExecute(t *testing.T) { t.Run("PostprocessSetTtl", func(t *testing.T) { argv := []string{"jq", "-cn", `{id: "foo"}`} - res, _, err := Execute("_", argv, nil, nil, "", time.Minute, func(item Item) Item { + res, _, _, err := Execute("_", argv, nil, nil, "", time.Minute, func(item Item) Item { item.Ttl = 123456 return item }) @@ -190,4 +191,22 @@ func TestExecute(t *testing.T) { t.Fatalf("expected ttl to be set to 123456, got %d", res[0].Ttl) } }) + + t.Run("ErrorItem", func(t *testing.T) { + argv := []string{"sh", "-c", `echo 1>&2 Hello; jq -cn '{id: "box"}'; echo 1>&2 World; printf '{"whoops": "my bad"}'`} + _, _, errItem, err := Execute("test", argv, nil, nil, "", time.Minute, nil) + assertNotNil(t, err) + if errItem.Id == "" { + t.Fatal("missing erritem id") + } + if errItem.Source != "default" { + t.Fatalf("unexpected erritem source: expected default, got %s", errItem.Source) + } + if !strings.Contains(errItem.Body, "Hello") || !strings.Contains(errItem.Body, "World") { + t.Fatal("missing stderr from erritem") + } + if !strings.Contains(errItem.Body, "whoops") { + t.Fatal("missing stdout from erritem") + } + }) } diff --git a/core/source.go b/core/source.go index d4ce625..4631529 100644 --- a/core/source.go +++ b/core/source.go @@ -228,8 +228,10 @@ func updateWithFetchedItemsTx( var updatedNewItems []Item for _, item := range newItems { var updatedItem Item - updatedItem, state, err = ExecuteItemAction(item, onCreateArgv, envs, state, time.Minute, postProcess) + var errItem Item + updatedItem, state, errItem, err = ExecuteItemAction(item, onCreateArgv, envs, state, time.Minute, postProcess) if err != nil { + AddErrorItem(db, errItem) log.Printf("error: on_create failed for %s/%s: %v", item.Source, item.Id, err) } updatedNewItems = append(updatedNewItems, updatedItem) diff --git a/core/source_test.go b/core/source_test.go index 3456c21..b2a6588 100644 --- a/core/source_test.go +++ b/core/source_test.go @@ -147,7 +147,7 @@ func TestOnCreateAction(t *testing.T) { execute := func(argv []string) []Item { t.Helper() - items, _, err := Execute("test", argv, nil, nil, "", time.Minute, nil) + items, _, _, err := Execute("test", argv, nil, nil, "", time.Minute, nil) if err != nil { t.Fatalf("unexpected error executing test fetch: %v", err) } diff --git a/test/test_items.sh b/test/test_items.sh index 06d0850..225e413 100755 --- a/test/test_items.sh +++ b/test/test_items.sh @@ -6,6 +6,7 @@ rm tmp/intake.db* || true export INTAKE_DATA_DIR="tmp" tmp/intake migrate +# testing item display format tmp/intake source add -s feedtest tmp/intake item add -s feedtest --id "this-item-has-no-title" tmp/intake item add -s feedtest --id a --title "This item has only a title" @@ -28,18 +29,24 @@ tmp/intake item add -s feedtest --id q --title "TTS -10d" --tts "-864000" tmp/intake action add -s feedtest -a fetch -- jq -cn '{id: "0", title: "Returned by fetch"}' +# testing actions that modify items tmp/intake source add -s spook tmp/intake action add -s spook -a spookier -- jq -c '.title = .title + "o"' tmp/intake item add -s spook --id boo --title "Boo" --action '{"spookier": true}' +tmp/intake channel add -c home -s feedtest +tmp/intake channel add -c home -s spook + +# testing empty feeds tmp/intake source add -s nothing tmp/intake action add -s nothing -a fetch -- true - -tmp/intake channel add -c all -s feedtest -tmp/intake channel add -c all -s spook - tmp/intake channel add -c none -s nothing +# testing error items +tmp/intake source add -s gonnafail +tmp/intake action add -s gonnafail -a fetch -- sh -c 'echo Three... 1>&2; echo Two... 1>&2; echo One... 1>&2; echo BOOM!' + +# testing feed paging tmp/intake source add -s page tmp/intake item add -s page --id 1 --title "Item 1" --body "This is the body of item 1" for i in $(seq 2 211); do @@ -47,5 +54,6 @@ for i in $(seq 2 211); do done tmp/intake item add -s page --id 212 --title "Item 212" --body "This is the body of item 212" +# default password, comment out to test no password echo "hello" | tmp/intake passwd --stdin echo "hello" | tmp/intake passwd --stdin --verify diff --git a/web/item.go b/web/item.go index 0c402b7..1128211 100644 --- a/web/item.go +++ b/web/item.go @@ -85,8 +85,9 @@ func (env *Env) doAction(writer http.ResponseWriter, req *http.Request) { return } - newItem, newState, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute, postProcess) + newItem, newState, errItem, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute, postProcess) if err != nil { + core.AddErrorItem(env.db, errItem) http.Error(writer, err.Error(), 500) return } diff --git a/web/source.go b/web/source.go index 0fe7788..ec16f8e 100644 --- a/web/source.go +++ b/web/source.go @@ -55,8 +55,9 @@ func (env *Env) fetchSource(writer http.ResponseWriter, req *http.Request) { return } - items, newState, err := core.Execute(source, argv, envs, state, "", time.Minute, postProcess) + items, newState, errItem, err := core.Execute(source, argv, envs, state, "", time.Minute, postProcess) if err != nil { + core.AddErrorItem(env.db, errItem) http.Error(writer, fmt.Sprintf("error: failed to execute fetch: %v", err.Error()), 500) return }