From 10f429432894ced9451c28457dc91cd30773ab37 Mon Sep 17 00:00:00 2001 From: Tim Van Baak Date: Fri, 17 Jan 2025 13:49:23 -0800 Subject: [PATCH] Add Execute() and test command --- cmd/feed.go | 13 ++--- cmd/test.go | 52 ++++++++++++++++++ core/action.go | 129 ++++++++++++++++++++++++++++++++++++++++++++ core/action_test.go | 114 +++++++++++++++++++++++++++++++++++++++ core/item.go | 13 +++++ 5 files changed, 311 insertions(+), 10 deletions(-) create mode 100644 cmd/test.go create mode 100644 core/action.go create mode 100644 core/action_test.go diff --git a/cmd/feed.go b/cmd/feed.go index bccb865..2c22087 100644 --- a/cmd/feed.go +++ b/cmd/feed.go @@ -39,16 +39,9 @@ func init() { } func feed() { - var formatter func(core.Item) string - switch feedFormat { - case "headlines": - formatter = core.FormatAsHeadline - case "json": - formatter = core.FormatAsJson - case "short": - formatter = core.FormatAsShort - default: - log.Fatalf("error: invalid format %s", feedFormat) + formatter, err := core.FormatAs(feedFormat) + if err != nil { + log.Fatal(err) } db, err := sql.Open("sqlite3", getDbPath()) diff --git a/cmd/test.go b/cmd/test.go new file mode 100644 index 0000000..2ada763 --- /dev/null +++ b/cmd/test.go @@ -0,0 +1,52 @@ +package cmd + +import ( + "fmt" + "log" + "time" + + "github.com/Jaculabilis/intake/core" + "github.com/spf13/cobra" +) + +var testCmd = &cobra.Command{ + Use: "test [flags] -- command", + Short: "Test a feed source", + Long: `Execute a command as if it were a feed source's fetch action. + +The display format of the returned items is the same as "intake feed".`, + Run: func(cmd *cobra.Command, args []string) { + l := cmd.Flags().ArgsLenAtDash() + if l == -1 { + test(nil) + } else { + test(args[l:]) + } + }, +} + +var testEnv []string +var testFormat string + +func init() { + rootCmd.AddCommand(testCmd) + + testCmd.Flags().StringArrayVarP(&testEnv, "env", "e", nil, "Environment variables to set, in the form KEY=VAL") + testCmd.Flags().StringVarP(&testFormat, "format", "f", "headlines", "Feed format for returned items.") +} + +func test(cmd []string) { + formatter, err := core.FormatAs(testFormat) + if err != nil { + log.Fatal(err) + } + + items, err := core.Execute(cmd, testEnv, "", time.Minute) + log.Printf("Returned %d items", len(items)) + if err != nil { + log.Fatal(err) + } + for _, item := range items { + fmt.Println(formatter(item)) + } +} diff --git a/core/action.go b/core/action.go new file mode 100644 index 0000000..4abf981 --- /dev/null +++ b/core/action.go @@ -0,0 +1,129 @@ +package core + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "io" + "log" + "os" + "os/exec" + "strings" + "time" +) + +func readStdout(stdout io.ReadCloser, items chan Item, cparse chan bool) { + var item Item + parseError := false + scanout := bufio.NewScanner(stdout) + for scanout.Scan() { + data := scanout.Bytes() + err := json.Unmarshal(data, &item) + if err != nil { + log.Printf("[stdout] %s\n", strings.TrimSpace(string(data))) + parseError = true + } else { + log.Printf("[item] %s\n", item.Id) + items <- item + } + } + // Only send the parsing result at the end, to block main until stdout is drained + cparse <- parseError + close(items) +} + +func readStderr(stderr io.ReadCloser, done chan bool) { + scanerr := bufio.NewScanner(stderr) + for scanerr.Scan() { + text := strings.TrimSpace(scanerr.Text()) + log.Printf("[stderr] %s\n", text) + } + done <- true +} + +func writeStdin(stdin io.WriteCloser, text string) { + defer stdin.Close() + io.WriteString(stdin, text) +} + +func Execute( + argv []string, + env []string, + input string, + timeout time.Duration, +) ([]Item, error) { + log.Printf("Executing %v", argv) + + if len(argv) == 0 { + return nil, errors.New("error: empty argv") + } + + env = append(env, "STATE_PATH=") + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + cmd := exec.CommandContext(ctx, argv[0], argv[1:]...) + cmd.Env = append(os.Environ(), env...) + cmd.WaitDelay = time.Second * 5 + + // Open pipes to the command + stdin, err := cmd.StdinPipe() + if err != nil { + return nil, err + } + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, err + } + stderr, err := cmd.StderrPipe() + if err != nil { + return nil, err + } + + cout := make(chan Item) + cparse := make(chan bool) + cerr := make(chan bool) + + // Sink routine for items produced + var items []Item + go func() { + for item := range cout { + items = append(items, item) + } + }() + + // Routines handling the process i/o + go writeStdin(stdin, input) + go readStdout(stdout, cout, cparse) + go readStderr(stderr, cerr) + + // Kick off the command + err = cmd.Start() + if err != nil { + return nil, err + } + + // Block until std{out,err} close + <-cerr + parseError := <-cparse + + err = cmd.Wait() + if ctx.Err() == context.DeadlineExceeded { + log.Printf("Timed out after %v\n", timeout) + return nil, err + } else if exiterr, ok := err.(*exec.ExitError); ok { + log.Printf("error: %s failed with exit code %d\n", argv[0], exiterr.ExitCode()) + return nil, err + } else if err != nil { + log.Printf("error: %s failed with error: %s\n", argv[0], err) + return nil, err + } + + if parseError { + log.Printf("error: could not parse item\n") + return nil, errors.New("invalid JSON") + } + + return items, nil +} diff --git a/core/action_test.go b/core/action_test.go new file mode 100644 index 0000000..f6ce4d8 --- /dev/null +++ b/core/action_test.go @@ -0,0 +1,114 @@ +package core + +import ( + "testing" + "time" +) + +func TestExecute(t *testing.T) { + assertLen := func(items []Item, length int) { + if len(items) != length { + t.Fatalf("Expected %d items, got %d", length, len(items)) + } + } + assertNil := func(err error) { + if err != nil { + t.Fatal(err) + } + } + assertNotNil := func(err error) { + if err == nil { + t.Fatal("expected err") + } + } + + res, err := Execute( + []string{"true"}, + nil, + "", + time.Minute, + ) + assertNil(err) + assertLen(res, 0) + + // Exit with error code + res, err = Execute( + []string{"false"}, + nil, + "", + time.Minute, + ) + assertNotNil(err) + assertLen(res, 0) + + res, err = Execute( + []string{"sh", "-c", "exit 22"}, + nil, + "", + time.Minute, + ) + assertNotNil(err) + assertLen(res, 0) + + // Timeout + res, err = Execute( + []string{"sleep", "10"}, + nil, + "", + time.Millisecond, + ) + assertNotNil(err) + assertLen(res, 0) + + // Returning items + res, err = Execute( + []string{"jq", "-cn", `{id: "foo"}`}, + nil, + "", + time.Minute, + ) + assertNil(err) + assertLen(res, 1) + if res[0].Id != "foo" { + t.Fatal("jq -cn test failed") + } + + // Read from stdin + res, err = Execute( + []string{"jq", "-cR", `{id: .}`}, + nil, + "bar", + time.Minute, + ) + assertNil(err) + assertLen(res, 1) + if res[0].Id != "bar" { + t.Fatal("jq -cR test failed") + } + + // Set env + res, err = Execute( + []string{"jq", "-cn", `{id: env.HELLO}`}, + []string{"HELLO=baz"}, + "", + time.Minute, + ) + assertNil(err) + assertLen(res, 1) + if res[0].Id != "baz" { + t.Fatal("jq -cn env test failed") + } + + // With logging on stderr + res, err = Execute( + []string{"sh", "-c", `echo 1>&2 Hello; jq -cn '{id: "box"}'; echo 1>&2 World`}, + nil, + "", + time.Minute, + ) + assertNil(err) + assertLen(res, 1) + if res[0].Id != "box" { + t.Fatal("stderr test failed") + } +} diff --git a/core/item.go b/core/item.go index 04c952c..886a7f9 100644 --- a/core/item.go +++ b/core/item.go @@ -37,3 +37,16 @@ func FormatAsJson(item Item) string { func FormatAsShort(item Item) string { return fmt.Sprintf("%s/%s", item.Source, item.Id) } + +func FormatAs(format string) (func(item Item) string, error) { + switch format { + case "headlines": + return FormatAsHeadline, nil + case "json": + return FormatAsJson, nil + case "short": + return FormatAsShort, nil + default: + return nil, fmt.Errorf("error: invalid format %s", format) + } +}