From e014b9f9b29ea92ac2bb524a961f5d365e5640fd Mon Sep 17 00:00:00 2001 From: Tim Van Baak Date: Sun, 9 Feb 2025 22:59:04 -0800 Subject: [PATCH] Simplify Execute goroutines --- core/execute.go | 130 +++++++++++++++++++++++--------------------- core/source_test.go | 2 +- 2 files changed, 69 insertions(+), 63 deletions(-) diff --git a/core/execute.go b/core/execute.go index 533055e..378326b 100644 --- a/core/execute.go +++ b/core/execute.go @@ -14,50 +14,19 @@ import ( "time" ) -func readStdout( - stdout io.ReadCloser, - source string, - postProcess func(item Item) Item, - items chan Item, - cparse chan bool, +func readPipe( + f io.ReadCloser, + send chan []byte, + done chan int, ) { - var item Item - parseError := false - scanout := bufio.NewScanner(stdout) - for scanout.Scan() { - data := scanout.Bytes() - err := json.Unmarshal(data, &item) - if err != nil || item.Id == "" { - log.Printf("[%s: stdout] %s\n", source, strings.TrimSpace(string(data))) - parseError = true - } else { - if postProcess != nil { - item = postProcess(item) - } - item.Active = true // These fields aren't up to - item.Created = 0 // the action to set and - item.Source = source // shouldn't be overrideable - log.Printf("[%s: item] %s\n", source, item.Id) - items <- item - } + defer func() { + done <- 0 + }() + scanner := bufio.NewScanner(f) + for scanner.Scan() { + data := scanner.Bytes() + send <- data } - // Only send the parsing result at the end, to block main until stdout is drained - cparse <- parseError - close(items) -} - -func readStderr(stderr io.ReadCloser, source string, done chan bool) { - scanerr := bufio.NewScanner(stderr) - for scanerr.Scan() { - text := strings.TrimSpace(scanerr.Text()) - log.Printf("[%s: stderr] %s\n", source, text) - } - done <- true -} - -func writeStdin(stdin io.WriteCloser, text string) { - defer stdin.Close() - io.WriteString(stdin, text) } func Execute( @@ -68,7 +37,11 @@ func Execute( input string, timeout time.Duration, postProcess func(item Item) Item, -) ([]Item, []byte, error) { +) ( + items []Item, + newState []byte, + err error, +) { log.Printf("executing %v", argv) if len(argv) == 0 { @@ -115,22 +88,14 @@ func Execute( return nil, nil, err } - cout := make(chan Item) - cparse := make(chan bool) - cerr := make(chan bool) - - // Sink routine for items produced - var items []Item - go func() { - for item := range cout { - items = append(items, item) - } - }() + cout := make(chan []byte) + cerr := make(chan []byte) + coutDone := make(chan int) + cerrDone := make(chan int) // Routines handling the process i/o - go writeStdin(stdin, input) - go readStdout(stdout, source, postProcess, cout, cparse) - go readStderr(stderr, source, cerr) + go readPipe(stdout, cout, coutDone) + go readPipe(stderr, cerr, cerrDone) // Kick off the command err = cmd.Start() @@ -138,9 +103,50 @@ func Execute( return nil, nil, err } - // Block until std{out,err} close - <-cerr - parseError := <-cparse + // Write any input to stdin and close it + io.WriteString(stdin, input) + stdin.Close() + + // Collect outputs until std{out,err} close + parseError := false + stdoutDone := false + stderrDone := false +monitor: + for { + select { + case data := <-cout: + var item Item + err := json.Unmarshal(data, &item) + if err != nil || item.Id == "" { + log.Printf("[%s: stdout] %s", source, strings.TrimSpace(string(data))) + parseError = true + } else { + if postProcess != nil { + item = postProcess(item) + } + item.Active = true // These fields aren't up to + item.Created = 0 // the action to set and + item.Source = source // shouldn't be overrideable + log.Printf("[%s: item] %s\n", source, item.Id) + items = append(items, item) + } + + case data := <-cerr: + log.Printf("[%s: stderr] %s\n", source, data) + + case <-coutDone: + stdoutDone = true + if stdoutDone && stderrDone { + break monitor + } + + case <-cerrDone: + stderrDone = true + if stdoutDone && stderrDone { + break monitor + } + } + } err = cmd.Wait() if ctx.Err() == context.DeadlineExceeded { @@ -159,12 +165,12 @@ func Execute( return nil, nil, errors.New("invalid JSON") } - newState, err := os.ReadFile(stateFile.Name()) + newState, err = os.ReadFile(stateFile.Name()) if err != nil { return nil, nil, fmt.Errorf("error: failed to read state file: %v", err) } - return items, newState, nil + return } // Execute an action that takes an item as input and returns the item modified. diff --git a/core/source_test.go b/core/source_test.go index eb1812a..7ab455f 100644 --- a/core/source_test.go +++ b/core/source_test.go @@ -183,7 +183,7 @@ func TestOnCreateAction(t *testing.T) { items := execute([]string{"jq", "-cn", `{id: "one"}`}) add, _, err := update(items) if add != 1 || err != nil { - t.Fatal("failed update with noop oncreate") + t.Fatalf("failed update with noop oncreate: %v", err) } updated := getItem("one") updated.Created = 0 // zero out for comparison with pre-insert item