Add error item on action failure

This commit is contained in:
Tim Van Baak 2025-02-10 08:22:19 -08:00
parent f38cfa3008
commit 8d9dcec038
12 changed files with 179 additions and 51 deletions

View File

@ -118,7 +118,7 @@ Future features
* [ ] manual item edits, web * [ ] manual item edits, web
* [ ] source-level TTS * [ ] source-level TTS
* [ ] metric reporting * [ ] metric reporting
* [ ] on action failure, create an error item with logs * [x] on action failure, create an error item with logs
* [ ] items gracefully add new fields and `action` keys * [ ] items gracefully add new fields and `action` keys
* [ ] arbitrary date punt * [ ] arbitrary date punt
* [ ] sort crontab entries * [ ] sort crontab entries

View File

@ -99,8 +99,9 @@ func actionExecute(
} }
} }
newItem, newState, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute, postProcess) newItem, newState, errItem, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute, postProcess)
if err != nil { if err != nil {
core.AddErrorItem(db, errItem)
log.Fatalf("error executing %s: %v", action, err) log.Fatalf("error executing %s: %v", action, err)
} }

View File

@ -48,8 +48,9 @@ func sourceFetch(source string, format string, dryRun bool) {
log.Fatalf("error: failed to load data for %s: %v", source, err) log.Fatalf("error: failed to load data for %s: %v", source, err)
} }
items, newState, err := core.Execute(source, argv, envs, state, "", time.Minute, postProcess) items, newState, errItem, err := core.Execute(source, argv, envs, state, "", time.Minute, postProcess)
if err != nil { if err != nil {
core.AddErrorItem(db, errItem)
log.Fatalf("error: failed to execute fetch: %v", err) log.Fatalf("error: failed to execute fetch: %v", err)
} }

View File

@ -33,7 +33,7 @@ func init() {
func sourceTest(env []string, format string, cmd []string) { func sourceTest(env []string, format string, cmd []string) {
formatter := formatAs(format) formatter := formatAs(format)
items, state, err := core.Execute("test", cmd, env, nil, "", time.Minute, nil) items, state, _, err := core.Execute("test", cmd, env, nil, "", time.Minute, nil)
log.Printf("returned %d items", len(items)) log.Printf("returned %d items", len(items))
log.Printf("wrote %d bytes of state", len(state)) log.Printf("wrote %d bytes of state", len(state))
if err != nil { if err != nil {

22
core/error.go Normal file
View File

@ -0,0 +1,22 @@
package core
import "log"
func AddErrorItem(db DB, item Item) {
exists, err := SourceExists(db, "default")
if err != nil {
log.Printf("error: could not ensure default source: %v", err)
return
}
if !exists {
if err = AddSource(db, "default"); err != nil {
log.Printf("error: could not add default source: %v", err)
return
}
}
item.Source = "default"
err = AddItems(db, []Item{item})
if err != nil {
log.Printf("error: could not add error item: %v", err)
}
}

View File

@ -4,7 +4,6 @@ import (
"bufio" "bufio"
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"log" "log"
@ -29,6 +28,22 @@ func readPipe(
} }
} }
// Two-stage error item helper, one stage to capture the immutable parameters and a second
// to include the variable error and logs.
func makeMakeErrItem(source string, argv []string) func(err error, logs []string) (item Item) {
return func(err error, logs []string) (item Item) {
item.Source = "default"
item.Id = RandomHex(16)
item.Title = fmt.Sprintf("Failed to execute for %s", source)
log := "no logs"
if len(logs) > 0 {
log = strings.Join(logs, "\n")
}
item.Body = fmt.Sprintf("<p><i>executing:</i> %#v</p><p><i>error:</i> %s</p><pre>%s</pre>", argv, err.Error(), log)
return
}
}
func Execute( func Execute(
source string, source string,
argv []string, argv []string,
@ -40,20 +55,30 @@ func Execute(
) ( ) (
items []Item, items []Item,
newState []byte, newState []byte,
errItem Item,
err error, err error,
) { ) {
log.Printf("executing %v", argv) log.Printf("executing %v", argv)
if len(argv) == 0 { makeErrorItem := makeMakeErrItem(source, argv)
return nil, nil, errors.New("empty argv") var logs []string
}
if source == "" { if source == "" {
return nil, nil, errors.New("empty source") err = fmt.Errorf("empty source")
errItem = makeErrorItem(err, logs)
return
}
if len(argv) == 0 {
err = fmt.Errorf("empty argv for %s", source)
errItem = makeErrorItem(err, logs)
return
} }
stateFile, err := os.CreateTemp("", "intake_state_*") stateFile, err := os.CreateTemp("", "intake_state_*")
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("error: failed to create temp state file: %v", err) err = fmt.Errorf("failed to create temp state file: %v", err)
errItem = makeErrorItem(err, logs)
return
} }
defer func() { defer func() {
if err := os.Remove(stateFile.Name()); err != nil { if err := os.Remove(stateFile.Name()); err != nil {
@ -63,7 +88,9 @@ func Execute(
_, err = stateFile.Write(state) _, err = stateFile.Write(state)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("error: failed to write state file: %v", err) err = fmt.Errorf("failed to write state file: %v", err)
errItem = makeErrorItem(err, logs)
return
} }
env = append(env, "STATE_PATH="+stateFile.Name()) env = append(env, "STATE_PATH="+stateFile.Name())
@ -77,15 +104,21 @@ func Execute(
// Open pipes to the command // Open pipes to the command
stdin, err := cmd.StdinPipe() stdin, err := cmd.StdinPipe()
if err != nil { if err != nil {
return nil, nil, err err = fmt.Errorf("failed to open stdin pipe: %v", err)
errItem = makeErrorItem(err, logs)
return
} }
stdout, err := cmd.StdoutPipe() stdout, err := cmd.StdoutPipe()
if err != nil { if err != nil {
return nil, nil, err err = fmt.Errorf("failed to open stdout pipe: %v", err)
errItem = makeErrorItem(err, logs)
return
} }
stderr, err := cmd.StderrPipe() stderr, err := cmd.StderrPipe()
if err != nil { if err != nil {
return nil, nil, err err = fmt.Errorf("failed to open stderr pipe: %v", err)
errItem = makeErrorItem(err, logs)
return
} }
cout := make(chan []byte) cout := make(chan []byte)
@ -100,12 +133,22 @@ func Execute(
// Kick off the command // Kick off the command
err = cmd.Start() err = cmd.Start()
if err != nil { if err != nil {
return nil, nil, err err = fmt.Errorf("failed to start execution: %v", err)
errItem = makeErrorItem(err, logs)
return
} }
// Write any input to stdin and close it // Write any input to stdin and close it
io.WriteString(stdin, input) if _, err = io.WriteString(stdin, input); err != nil {
stdin.Close() err = fmt.Errorf("failed to write to stdin: %v", err)
errItem = makeErrorItem(err, logs)
return
}
if err = stdin.Close(); err != nil {
err = fmt.Errorf("failed to close stdin: %v", err)
errItem = makeErrorItem(err, logs)
return
}
// Collect outputs until std{out,err} close // Collect outputs until std{out,err} close
parseError := false parseError := false
@ -118,7 +161,9 @@ monitor:
var item Item var item Item
err := json.Unmarshal(data, &item) err := json.Unmarshal(data, &item)
if err != nil || item.Id == "" { if err != nil || item.Id == "" {
log.Printf("[%s: stdout] %s", source, strings.TrimSpace(string(data))) msg := fmt.Sprintf("[%s: stdout] %s", source, strings.TrimSpace(string(data)))
log.Print(msg)
logs = append(logs, msg)
parseError = true parseError = true
} else { } else {
if postProcess != nil { if postProcess != nil {
@ -127,12 +172,16 @@ monitor:
item.Active = true // These fields aren't up to item.Active = true // These fields aren't up to
item.Created = 0 // the action to set and item.Created = 0 // the action to set and
item.Source = source // shouldn't be overrideable item.Source = source // shouldn't be overrideable
log.Printf("[%s: item] %s\n", source, item.Id) msg := fmt.Sprintf("[%s: item] %s", source, item.Id)
log.Print(msg)
logs = append(logs, msg)
items = append(items, item) items = append(items, item)
} }
case data := <-cerr: case data := <-cerr:
log.Printf("[%s: stderr] %s\n", source, data) msg := fmt.Sprintf("[%s: stderr] %s", source, strings.TrimSpace(string(data)))
log.Print(msg)
logs = append(logs, msg)
case <-coutDone: case <-coutDone:
stdoutDone = true stdoutDone = true
@ -150,24 +199,35 @@ monitor:
err = cmd.Wait() err = cmd.Wait()
if ctx.Err() == context.DeadlineExceeded { if ctx.Err() == context.DeadlineExceeded {
log.Printf("Timed out after %v\n", timeout) err = fmt.Errorf("timed out after %v", timeout)
return nil, nil, err errItem = makeErrorItem(err, logs)
log.Printf("error: %v", err)
return nil, nil, errItem, err
} else if exiterr, ok := err.(*exec.ExitError); ok { } else if exiterr, ok := err.(*exec.ExitError); ok {
log.Printf("error: %s failed with exit code %d\n", argv[0], exiterr.ExitCode()) err = fmt.Errorf("%s failed with exit code %d: %v", argv[0], exiterr.ExitCode(), exiterr)
return nil, nil, err errItem = makeErrorItem(err, logs)
log.Printf("error: %v", err)
return nil, nil, errItem, err
} else if err != nil { } else if err != nil {
log.Printf("error: %s failed with error: %s\n", argv[0], err) err = fmt.Errorf("error: %s failed with error: %s", argv[0], err)
return nil, nil, err errItem = makeErrorItem(err, logs)
log.Printf("error: %v", err)
return nil, nil, errItem, err
} }
if parseError { if parseError {
log.Printf("error: could not parse item\n") err = fmt.Errorf("could not parse an item")
return nil, nil, errors.New("invalid JSON") errItem = makeErrorItem(err, logs)
log.Printf("error: %v", err)
return nil, nil, errItem, err
} }
newState, err = os.ReadFile(stateFile.Name()) newState, err = os.ReadFile(stateFile.Name())
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("error: failed to read state file: %v", err) err = fmt.Errorf("error: failed to read state file: %v", err)
errItem = makeErrorItem(err, logs)
log.Printf("error: %v", err)
return nil, nil, errItem, err
} }
return return
@ -182,21 +242,34 @@ func ExecuteItemAction(
state []byte, state []byte,
timeout time.Duration, timeout time.Duration,
postProcess func(item Item) Item, postProcess func(item Item) Item,
) (Item, []byte, error) { ) (
newItem Item,
newState []byte,
errItem Item,
err error,
) {
makeErrorItem := makeMakeErrItem(item.Source, argv)
itemJson, err := json.Marshal(item) itemJson, err := json.Marshal(item)
if err != nil { if err != nil {
return Item{}, nil, fmt.Errorf("failed to serialize item: %v", err) err = fmt.Errorf("failed to serialize item: %v", err)
errItem = makeErrorItem(err, nil)
return
} }
res, newState, err := Execute(item.Source, argv, env, state, string(itemJson), timeout, postProcess) res, newState, errItem, err := Execute(item.Source, argv, env, state, string(itemJson), timeout, postProcess)
if err != nil { if err != nil {
return Item{}, nil, fmt.Errorf("failed to execute action for %s/%s: %v", item.Source, item.Id, err) err = fmt.Errorf("failed to execute action for %s/%s: %v", item.Source, item.Id, err)
errItem = makeErrorItem(err, nil)
return
} }
if len(res) != 1 { if len(res) != 1 {
return Item{}, nil, fmt.Errorf("expected action to produce exactly one item, got %d", len(res)) err = fmt.Errorf("expected action to produce exactly one item, got %d", len(res))
errItem = makeErrorItem(err, nil)
return
} }
newItem := res[0] newItem = res[0]
BackfillItem(&newItem, &item) BackfillItem(&newItem, &item)
return newItem, newState, nil return
} }

View File

@ -1,6 +1,7 @@
package core package core
import ( import (
"strings"
"testing" "testing"
"time" "time"
) )
@ -25,7 +26,7 @@ func TestExecute(t *testing.T) {
} }
} }
execute := func(argv []string) ([]Item, error) { execute := func(argv []string) ([]Item, error) {
item, _, err := Execute("_", argv, nil, nil, "", time.Minute, nil) item, _, _, err := Execute("_", argv, nil, nil, "", time.Minute, nil)
return item, err return item, err
} }
@ -48,7 +49,7 @@ func TestExecute(t *testing.T) {
}) })
t.Run("Timeout", func(t *testing.T) { t.Run("Timeout", func(t *testing.T) {
res, _, err := Execute("_", []string{"sleep", "10"}, nil, nil, "", time.Millisecond, nil) res, _, _, err := Execute("_", []string{"sleep", "10"}, nil, nil, "", time.Millisecond, nil)
assertNotNil(t, err) assertNotNil(t, err)
assertLen(t, res, 0) assertLen(t, res, 0)
}) })
@ -63,7 +64,7 @@ func TestExecute(t *testing.T) {
}) })
t.Run("ReadFromStdin", func(t *testing.T) { t.Run("ReadFromStdin", func(t *testing.T) {
res, _, err := Execute("_", []string{"jq", "-cR", `{id: .}`}, nil, nil, "bar", time.Minute, nil) res, _, _, err := Execute("_", []string{"jq", "-cR", `{id: .}`}, nil, nil, "bar", time.Minute, nil)
assertNil(t, err) assertNil(t, err)
assertLen(t, res, 1) assertLen(t, res, 1)
if res[0].Id != "bar" { if res[0].Id != "bar" {
@ -72,7 +73,7 @@ func TestExecute(t *testing.T) {
}) })
t.Run("SetEnv", func(t *testing.T) { t.Run("SetEnv", func(t *testing.T) {
res, _, err := Execute("_", []string{"jq", "-cn", `{id: env.HELLO}`}, []string{"HELLO=baz"}, nil, "", time.Minute, nil) res, _, _, err := Execute("_", []string{"jq", "-cn", `{id: env.HELLO}`}, []string{"HELLO=baz"}, nil, "", time.Minute, nil)
assertNil(t, err) assertNil(t, err)
assertLen(t, res, 1) assertLen(t, res, 1)
if res[0].Id != "baz" { if res[0].Id != "baz" {
@ -160,7 +161,7 @@ func TestExecute(t *testing.T) {
t.Run("ReadState", func(t *testing.T) { t.Run("ReadState", func(t *testing.T) {
argv := []string{"sh", "-c", `cat $STATE_PATH | jq -cR '{id: "greeting", title: .} | .title = "Hello " + .title'`} argv := []string{"sh", "-c", `cat $STATE_PATH | jq -cR '{id: "greeting", title: .} | .title = "Hello " + .title'`}
res, _, err := Execute("_", argv, nil, []byte("world"), "", time.Minute, nil) res, _, _, err := Execute("_", argv, nil, []byte("world"), "", time.Minute, nil)
assertNil(t, err) assertNil(t, err)
assertLen(t, res, 1) assertLen(t, res, 1)
if res[0].Title != "Hello world" { if res[0].Title != "Hello world" {
@ -170,7 +171,7 @@ func TestExecute(t *testing.T) {
t.Run("WriteState", func(t *testing.T) { t.Run("WriteState", func(t *testing.T) {
argv := []string{"sh", "-c", `printf "Hello world" > $STATE_PATH; jq -cn '{id: "test"}'`} argv := []string{"sh", "-c", `printf "Hello world" > $STATE_PATH; jq -cn '{id: "test"}'`}
res, newState, err := Execute("_", argv, nil, nil, "", time.Minute, nil) res, newState, _, err := Execute("_", argv, nil, nil, "", time.Minute, nil)
assertNil(t, err) assertNil(t, err)
assertLen(t, res, 1) assertLen(t, res, 1)
if string(newState) != "Hello world" { if string(newState) != "Hello world" {
@ -180,7 +181,7 @@ func TestExecute(t *testing.T) {
t.Run("PostprocessSetTtl", func(t *testing.T) { t.Run("PostprocessSetTtl", func(t *testing.T) {
argv := []string{"jq", "-cn", `{id: "foo"}`} argv := []string{"jq", "-cn", `{id: "foo"}`}
res, _, err := Execute("_", argv, nil, nil, "", time.Minute, func(item Item) Item { res, _, _, err := Execute("_", argv, nil, nil, "", time.Minute, func(item Item) Item {
item.Ttl = 123456 item.Ttl = 123456
return item return item
}) })
@ -190,4 +191,22 @@ func TestExecute(t *testing.T) {
t.Fatalf("expected ttl to be set to 123456, got %d", res[0].Ttl) t.Fatalf("expected ttl to be set to 123456, got %d", res[0].Ttl)
} }
}) })
t.Run("ErrorItem", func(t *testing.T) {
argv := []string{"sh", "-c", `echo 1>&2 Hello; jq -cn '{id: "box"}'; echo 1>&2 World; printf '{"whoops": "my bad"}'`}
_, _, errItem, err := Execute("test", argv, nil, nil, "", time.Minute, nil)
assertNotNil(t, err)
if errItem.Id == "" {
t.Fatal("missing erritem id")
}
if errItem.Source != "default" {
t.Fatalf("unexpected erritem source: expected default, got %s", errItem.Source)
}
if !strings.Contains(errItem.Body, "Hello") || !strings.Contains(errItem.Body, "World") {
t.Fatal("missing stderr from erritem")
}
if !strings.Contains(errItem.Body, "whoops") {
t.Fatal("missing stdout from erritem")
}
})
} }

View File

@ -228,8 +228,10 @@ func updateWithFetchedItemsTx(
var updatedNewItems []Item var updatedNewItems []Item
for _, item := range newItems { for _, item := range newItems {
var updatedItem Item var updatedItem Item
updatedItem, state, err = ExecuteItemAction(item, onCreateArgv, envs, state, time.Minute, postProcess) var errItem Item
updatedItem, state, errItem, err = ExecuteItemAction(item, onCreateArgv, envs, state, time.Minute, postProcess)
if err != nil { if err != nil {
AddErrorItem(db, errItem)
log.Printf("error: on_create failed for %s/%s: %v", item.Source, item.Id, err) log.Printf("error: on_create failed for %s/%s: %v", item.Source, item.Id, err)
} }
updatedNewItems = append(updatedNewItems, updatedItem) updatedNewItems = append(updatedNewItems, updatedItem)

View File

@ -147,7 +147,7 @@ func TestOnCreateAction(t *testing.T) {
execute := func(argv []string) []Item { execute := func(argv []string) []Item {
t.Helper() t.Helper()
items, _, err := Execute("test", argv, nil, nil, "", time.Minute, nil) items, _, _, err := Execute("test", argv, nil, nil, "", time.Minute, nil)
if err != nil { if err != nil {
t.Fatalf("unexpected error executing test fetch: %v", err) t.Fatalf("unexpected error executing test fetch: %v", err)
} }

View File

@ -6,6 +6,7 @@ rm tmp/intake.db* || true
export INTAKE_DATA_DIR="tmp" export INTAKE_DATA_DIR="tmp"
tmp/intake migrate tmp/intake migrate
# testing item display format
tmp/intake source add -s feedtest tmp/intake source add -s feedtest
tmp/intake item add -s feedtest --id "this-item-has-no-title" tmp/intake item add -s feedtest --id "this-item-has-no-title"
tmp/intake item add -s feedtest --id a --title "This item has only a title" tmp/intake item add -s feedtest --id a --title "This item has only a title"
@ -28,18 +29,24 @@ tmp/intake item add -s feedtest --id q --title "TTS -10d" --tts "-864000"
tmp/intake action add -s feedtest -a fetch -- jq -cn '{id: "0", title: "Returned by fetch"}' tmp/intake action add -s feedtest -a fetch -- jq -cn '{id: "0", title: "Returned by fetch"}'
# testing actions that modify items
tmp/intake source add -s spook tmp/intake source add -s spook
tmp/intake action add -s spook -a spookier -- jq -c '.title = .title + "o"' tmp/intake action add -s spook -a spookier -- jq -c '.title = .title + "o"'
tmp/intake item add -s spook --id boo --title "Boo" --action '{"spookier": true}' tmp/intake item add -s spook --id boo --title "Boo" --action '{"spookier": true}'
tmp/intake channel add -c home -s feedtest
tmp/intake channel add -c home -s spook
# testing empty feeds
tmp/intake source add -s nothing tmp/intake source add -s nothing
tmp/intake action add -s nothing -a fetch -- true tmp/intake action add -s nothing -a fetch -- true
tmp/intake channel add -c all -s feedtest
tmp/intake channel add -c all -s spook
tmp/intake channel add -c none -s nothing tmp/intake channel add -c none -s nothing
# testing error items
tmp/intake source add -s gonnafail
tmp/intake action add -s gonnafail -a fetch -- sh -c 'echo Three... 1>&2; echo Two... 1>&2; echo One... 1>&2; echo BOOM!'
# testing feed paging
tmp/intake source add -s page tmp/intake source add -s page
tmp/intake item add -s page --id 1 --title "Item 1" --body "This is the body of item 1" tmp/intake item add -s page --id 1 --title "Item 1" --body "This is the body of item 1"
for i in $(seq 2 211); do for i in $(seq 2 211); do
@ -47,5 +54,6 @@ for i in $(seq 2 211); do
done done
tmp/intake item add -s page --id 212 --title "Item 212" --body "This is the body of item 212" tmp/intake item add -s page --id 212 --title "Item 212" --body "This is the body of item 212"
# default password, comment out to test no password
echo "hello" | tmp/intake passwd --stdin echo "hello" | tmp/intake passwd --stdin
echo "hello" | tmp/intake passwd --stdin --verify echo "hello" | tmp/intake passwd --stdin --verify

View File

@ -85,8 +85,9 @@ func (env *Env) doAction(writer http.ResponseWriter, req *http.Request) {
return return
} }
newItem, newState, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute, postProcess) newItem, newState, errItem, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute, postProcess)
if err != nil { if err != nil {
core.AddErrorItem(env.db, errItem)
http.Error(writer, err.Error(), 500) http.Error(writer, err.Error(), 500)
return return
} }

View File

@ -55,8 +55,9 @@ func (env *Env) fetchSource(writer http.ResponseWriter, req *http.Request) {
return return
} }
items, newState, err := core.Execute(source, argv, envs, state, "", time.Minute, postProcess) items, newState, errItem, err := core.Execute(source, argv, envs, state, "", time.Minute, postProcess)
if err != nil { if err != nil {
core.AddErrorItem(env.db, errItem)
http.Error(writer, fmt.Sprintf("error: failed to execute fetch: %v", err.Error()), 500) http.Error(writer, fmt.Sprintf("error: failed to execute fetch: %v", err.Error()), 500)
return return
} }