From 9132bb46d73f80827211e21754889a2158ffad47 Mon Sep 17 00:00:00 2001 From: Tim Van Baak Date: Fri, 31 Jan 2025 14:29:10 -0800 Subject: [PATCH] Reduce action.go to action functions --- core/action.go | 180 -------------------------------------------- core/action_test.go | 146 ----------------------------------- 2 files changed, 326 deletions(-) diff --git a/core/action.go b/core/action.go index f097626..57bfa44 100644 --- a/core/action.go +++ b/core/action.go @@ -1,18 +1,8 @@ package core import ( - "bufio" - "context" "database/sql/driver" "encoding/json" - "errors" - "fmt" - "io" - "log" - "os" - "os/exec" - "strings" - "time" ) // Type alias for storing string array as jsonb @@ -85,173 +75,3 @@ func DeleteAction(db DB, source string, name string) error { `, source, name) return err } - -func readStdout(stdout io.ReadCloser, source string, items chan Item, cparse chan bool) { - var item Item - parseError := false - scanout := bufio.NewScanner(stdout) - for scanout.Scan() { - data := scanout.Bytes() - err := json.Unmarshal(data, &item) - if err != nil || item.Id == "" { - log.Printf("[%s: stdout] %s\n", source, strings.TrimSpace(string(data))) - parseError = true - } else { - item.Active = true // These fields aren't up to - item.Created = 0 // the action to set and - item.Source = source // shouldn't be overrideable - log.Printf("[%s: item] %s\n", source, item.Id) - items <- item - } - } - // Only send the parsing result at the end, to block main until stdout is drained - cparse <- parseError - close(items) -} - -func readStderr(stderr io.ReadCloser, source string, done chan bool) { - scanerr := bufio.NewScanner(stderr) - for scanerr.Scan() { - text := strings.TrimSpace(scanerr.Text()) - log.Printf("[%s: stderr] %s\n", source, text) - } - done <- true -} - -func writeStdin(stdin io.WriteCloser, text string) { - defer stdin.Close() - io.WriteString(stdin, text) -} - -func Execute( - source string, - argv []string, - env []string, - state []byte, - input string, - timeout time.Duration, -) ([]Item, []byte, error) { - log.Printf("executing %v", argv) - - if len(argv) == 0 { - return nil, nil, errors.New("empty argv") - } - if source == "" { - return nil, nil, errors.New("empty source") - } - - stateFile, err := os.CreateTemp("", "intake_state_*") - if err != nil { - return nil, nil, fmt.Errorf("error: failed to create temp state file: %v", err) - } - defer func() { - if err := os.Remove(stateFile.Name()); err != nil { - log.Printf("error: failed to delete %s", stateFile.Name()) - } - }() - - _, err = stateFile.Write(state) - if err != nil { - return nil, nil, fmt.Errorf("error: failed to write state file: %v", err) - } - - env = append(env, "STATE_PATH="+stateFile.Name()) - - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - cmd := exec.CommandContext(ctx, argv[0], argv[1:]...) - cmd.Env = append(os.Environ(), env...) - cmd.WaitDelay = time.Second * 5 - - // Open pipes to the command - stdin, err := cmd.StdinPipe() - if err != nil { - return nil, nil, err - } - stdout, err := cmd.StdoutPipe() - if err != nil { - return nil, nil, err - } - stderr, err := cmd.StderrPipe() - if err != nil { - return nil, nil, err - } - - cout := make(chan Item) - cparse := make(chan bool) - cerr := make(chan bool) - - // Sink routine for items produced - var items []Item - go func() { - for item := range cout { - items = append(items, item) - } - }() - - // Routines handling the process i/o - go writeStdin(stdin, input) - go readStdout(stdout, source, cout, cparse) - go readStderr(stderr, source, cerr) - - // Kick off the command - err = cmd.Start() - if err != nil { - return nil, nil, err - } - - // Block until std{out,err} close - <-cerr - parseError := <-cparse - - err = cmd.Wait() - if ctx.Err() == context.DeadlineExceeded { - log.Printf("Timed out after %v\n", timeout) - return nil, nil, err - } else if exiterr, ok := err.(*exec.ExitError); ok { - log.Printf("error: %s failed with exit code %d\n", argv[0], exiterr.ExitCode()) - return nil, nil, err - } else if err != nil { - log.Printf("error: %s failed with error: %s\n", argv[0], err) - return nil, nil, err - } - - if parseError { - log.Printf("error: could not parse item\n") - return nil, nil, errors.New("invalid JSON") - } - - newState, err := os.ReadFile(stateFile.Name()) - if err != nil { - return nil, nil, fmt.Errorf("error: failed to read state file: %v", err) - } - - return items, newState, nil -} - -// Execute an action that takes an item as input and returns the item modified. -// This is basically just a wrapper over [Execute] that handles the input and backfilling. -func ExecuteItemAction( - item Item, - argv []string, - env []string, - state []byte, - timeout time.Duration, -) (Item, []byte, error) { - itemJson, err := json.Marshal(item) - if err != nil { - return Item{}, nil, fmt.Errorf("failed to serialize item: %v", err) - } - - res, newState, err := Execute(item.Source, argv, env, state, string(itemJson), timeout) - if err != nil { - return Item{}, nil, fmt.Errorf("failed to execute action for %s/%s: %v", item.Source, item.Id, err) - } - if len(res) != 1 { - return Item{}, nil, fmt.Errorf("expected action to produce exactly one item, got %d", len(res)) - } - newItem := res[0] - BackfillItem(&newItem, &item) - - return newItem, newState, nil -} diff --git a/core/action_test.go b/core/action_test.go index 9a33f04..7941215 100644 --- a/core/action_test.go +++ b/core/action_test.go @@ -2,7 +2,6 @@ package core import ( "testing" - "time" ) func TestActionCreate(t *testing.T) { @@ -54,148 +53,3 @@ func TestActionCreate(t *testing.T) { t.Fatal(err) } } - -func TestExecute(t *testing.T) { - assertLen := func(items []Item, length int) { - if len(items) != length { - t.Fatalf("Expected %d items, got %d", length, len(items)) - } - } - assertNil := func(err error) { - if err != nil { - t.Fatal(err) - } - } - assertNotNil := func(err error) { - if err == nil { - t.Fatal("expected err") - } - } - execute := func(argv []string) ([]Item, error) { - item, _, err := Execute("_", argv, nil, nil, "", time.Minute) - return item, err - } - - res, err := execute([]string{"true"}) - assertNil(err) - assertLen(res, 0) - - // Exit with error code - res, err = execute([]string{"false"}) - assertNotNil(err) - assertLen(res, 0) - - res, err = execute([]string{"sh", "-c", "exit 22"}) - assertNotNil(err) - assertLen(res, 0) - - // Timeout - res, _, err = Execute("_", []string{"sleep", "10"}, nil, nil, "", time.Millisecond) - assertNotNil(err) - assertLen(res, 0) - - // Returning items - res, err = execute([]string{"jq", "-cn", `{id: "foo"}`}) - assertNil(err) - assertLen(res, 1) - if res[0].Id != "foo" { - t.Fatal("jq -cn test failed") - } - - // Read from stdin - res, _, err = Execute("_", []string{"jq", "-cR", `{id: .}`}, nil, nil, "bar", time.Minute) - assertNil(err) - assertLen(res, 1) - if res[0].Id != "bar" { - t.Fatal("jq -cR test failed") - } - - // Set env - res, _, err = Execute("_", []string{"jq", "-cn", `{id: env.HELLO}`}, []string{"HELLO=baz"}, nil, "", time.Minute) - assertNil(err) - assertLen(res, 1) - if res[0].Id != "baz" { - t.Fatal("jq -cn env test failed") - } - - // With logging on stderr - res, err = execute([]string{"sh", "-c", `echo 1>&2 Hello; jq -cn '{id: "box"}'; echo 1>&2 World`}) - assertNil(err) - assertLen(res, 1) - if res[0].Id != "box" { - t.Fatal("stderr test failed") - } - - // Unsupported item field is silently discarded - res, err = execute([]string{"jq", "-cn", `{id: "test", unknownField: "what is this"}`}) - assertNil(err) - assertLen(res, 1) - - // Field with incorrect type fails - res, err = execute([]string{"jq", "-cn", `{id: ["list"]}`}) - assertNotNil(err) - assertLen(res, 0) - - res, err = execute([]string{"jq", "-cn", `{id: "test", time: "0"}`}) - assertNotNil(err) - assertLen(res, 0) - - res, err = execute([]string{"jq", "-cn", `{id: null}`}) - assertNotNil(err) - assertLen(res, 0) - - // Items with duplicate ids is not a fetch error, but it will fail to update - res, err = execute([]string{"jq", "-cn", `["a", "a"] | .[] | {id: .}`}) - assertNil(err) - assertLen(res, 2) - - // Action keys are detected even with empty values - res, err = execute([]string{"jq", "-cn", `{id: "test", action: {"hello": null}}`}) - assertNil(err) - assertLen(res, 1) - if res[0].Action["hello"] == nil { - t.Fatal("missing hello action") - } - if res[0].Action["goodbye"] != nil { - t.Fatal("nonexistent action should key to nil in Action") - } - - res, err = execute([]string{"jq", "-cn", `{id: "test", action: {"hello": ""}}`}) - assertNil(err) - assertLen(res, 1) - if res[0].Action["hello"] == nil { - t.Fatal("missing hello action") - } - - res, err = execute([]string{"jq", "-cn", `{id: "test", action: {"hello": []}}`}) - assertNil(err) - assertLen(res, 1) - if res[0].Action["hello"] == nil { - t.Fatal("missing hello action") - } - - res, err = execute([]string{"jq", "-cn", `{id: "test", action: {"hello": {}}}`}) - assertNil(err) - assertLen(res, 1) - if res[0].Action["hello"] == nil { - t.Fatal("missing hello action") - } - - // Read state - argv := []string{"sh", "-c", `cat $STATE_PATH | jq -cR '{id: "greeting", title: .} | .title = "Hello " + .title'`} - res, _, err = Execute("_", argv, nil, []byte("world"), "", time.Minute) - assertNil(err) - assertLen(res, 1) - if res[0].Title != "Hello world" { - t.Fatalf("expected 'Hello world' from read state, got '%s'", res[0].Title) - } - - // Write state - argv = []string{"sh", "-c", `printf "Hello world" > $STATE_PATH; jq -cn '{id: "test"}'`} - res, newState, err := Execute("_", argv, nil, nil, "", time.Minute) - assertNil(err) - assertLen(res, 1) - if string(newState) != "Hello world" { - t.Fatalf("expected 'Hello world' from write state, got %s", string(newState)) - } -}