From 2894493d347d0486936b3cca777a66fc20570946 Mon Sep 17 00:00:00 2001 From: Tim Van Baak Date: Wed, 5 Feb 2025 13:21:31 -0800 Subject: [PATCH] Support INTAKE_TTX env settings --- README.md | 10 +++++++-- cmd/actionExecute.go | 7 +++++- cmd/sourceFetch.go | 7 +++++- cmd/sourceTest.go | 5 ++++- core/execute.go | 17 +++++++++++--- core/execute_test.go | 24 +++++++++++++++----- core/source.go | 53 +++++++++++++++++++++++++++++++++++++++++++- core/source_test.go | 25 ++++++++++++++++++++- web/item.go | 7 +++++- 9 files changed, 138 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index c799b8b..8fdb559 100644 --- a/README.md +++ b/README.md @@ -102,11 +102,17 @@ Any unspecified field is equivalent to the empty string, object, or 0, depending Existing items are updated with new values when a fetch or action produces them, with some exceptions: * Automatic fields cannot be changed. -* If a field's previous value is non-empty and the new value is empty, the old value is kept. +* Source-level settings for `ttl`, `ttd`, or `tts` override the item's values. +* Fields cannot be updated from a non-empty value to an empty value. + If a field's previous value is non-empty and the new value is empty, the old value is kept. ### Sources -A source is identified by its name. A minimally functional source requires a `fetch` action that returns items. +A source is identified by its name. +A minimally functional source requires a `fetch` action that returns items. + +TTL, TTD, and TTS can be configured at the source level by setting the environment variables `INTAKE_TTL`, `INTAKE_TTS`, or `INTAKE_TTS` to an integer value. +These values override any `ttl`, `ttd`, or `tts` value returned by a fetch or action. ### Action API diff --git a/cmd/actionExecute.go b/cmd/actionExecute.go index 64930c5..f974d68 100644 --- a/cmd/actionExecute.go +++ b/cmd/actionExecute.go @@ -109,7 +109,12 @@ func actionExecute( log.Fatalf("error: failed to get action: %v", err) } - newItem, newState, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute) + postProcess, err := core.GetSourcePostProcessor(db, source) + if err != nil { + log.Fatalf("error: failed to get source post-processor: %v", err) + } + + newItem, newState, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute, postProcess) if err != nil { log.Fatalf("error executing %s: %v", action, err) } diff --git a/cmd/sourceFetch.go b/cmd/sourceFetch.go index 4a9aa14..74953a5 100644 --- a/cmd/sourceFetch.go +++ b/cmd/sourceFetch.go @@ -58,7 +58,12 @@ func sourceFetch(source string, format string, dryRun bool) { log.Fatalf("error: failed to get fetch action: %v", err) } - items, newState, err := core.Execute(source, argv, envs, state, "", time.Minute) + postProcess, err := core.GetSourcePostProcessor(db, source) + if err != nil { + log.Fatalf("error: failed to get source post-processor: %v", err) + } + + items, newState, err := core.Execute(source, argv, envs, state, "", time.Minute, postProcess) if err != nil { log.Fatalf("error: failed to execute fetch: %v", err) } diff --git a/cmd/sourceTest.go b/cmd/sourceTest.go index 62a47c4..116d743 100644 --- a/cmd/sourceTest.go +++ b/cmd/sourceTest.go @@ -14,6 +14,9 @@ var sourceTestCmd = &cobra.Command{ Short: "Test a fetch action", Long: fmt.Sprintf(`Execute a command as if it were a feed source's fetch action. +Source-level configuration that is normally set via environment variable, +such as INTAKE_TTL, will not be applied by --env. + %s`, makeFormatHelpText()), Run: func(cmd *cobra.Command, args []string) { sourceTest(stringArrayArg(cmd, "env"), stringArg(cmd, "format"), getArgv(cmd, args)) @@ -30,7 +33,7 @@ func init() { func sourceTest(env []string, format string, cmd []string) { formatter := formatAs(format) - items, state, err := core.Execute("test", cmd, env, nil, "", time.Minute) + items, state, err := core.Execute("test", cmd, env, nil, "", time.Minute, nil) log.Printf("returned %d items", len(items)) log.Printf("wrote %d bytes of state", len(state)) if err != nil { diff --git a/core/execute.go b/core/execute.go index 2f2e647..533055e 100644 --- a/core/execute.go +++ b/core/execute.go @@ -14,7 +14,13 @@ import ( "time" ) -func readStdout(stdout io.ReadCloser, source string, items chan Item, cparse chan bool) { +func readStdout( + stdout io.ReadCloser, + source string, + postProcess func(item Item) Item, + items chan Item, + cparse chan bool, +) { var item Item parseError := false scanout := bufio.NewScanner(stdout) @@ -25,6 +31,9 @@ func readStdout(stdout io.ReadCloser, source string, items chan Item, cparse cha log.Printf("[%s: stdout] %s\n", source, strings.TrimSpace(string(data))) parseError = true } else { + if postProcess != nil { + item = postProcess(item) + } item.Active = true // These fields aren't up to item.Created = 0 // the action to set and item.Source = source // shouldn't be overrideable @@ -58,6 +67,7 @@ func Execute( state []byte, input string, timeout time.Duration, + postProcess func(item Item) Item, ) ([]Item, []byte, error) { log.Printf("executing %v", argv) @@ -119,7 +129,7 @@ func Execute( // Routines handling the process i/o go writeStdin(stdin, input) - go readStdout(stdout, source, cout, cparse) + go readStdout(stdout, source, postProcess, cout, cparse) go readStderr(stderr, source, cerr) // Kick off the command @@ -165,13 +175,14 @@ func ExecuteItemAction( env []string, state []byte, timeout time.Duration, + postProcess func(item Item) Item, ) (Item, []byte, error) { itemJson, err := json.Marshal(item) if err != nil { return Item{}, nil, fmt.Errorf("failed to serialize item: %v", err) } - res, newState, err := Execute(item.Source, argv, env, state, string(itemJson), timeout) + res, newState, err := Execute(item.Source, argv, env, state, string(itemJson), timeout, postProcess) if err != nil { return Item{}, nil, fmt.Errorf("failed to execute action for %s/%s: %v", item.Source, item.Id, err) } diff --git a/core/execute_test.go b/core/execute_test.go index 483defb..6a25b43 100644 --- a/core/execute_test.go +++ b/core/execute_test.go @@ -25,7 +25,7 @@ func TestExecute(t *testing.T) { } } execute := func(argv []string) ([]Item, error) { - item, _, err := Execute("_", argv, nil, nil, "", time.Minute) + item, _, err := Execute("_", argv, nil, nil, "", time.Minute, nil) return item, err } @@ -43,7 +43,7 @@ func TestExecute(t *testing.T) { assertLen(res, 0) // Timeout - res, _, err = Execute("_", []string{"sleep", "10"}, nil, nil, "", time.Millisecond) + res, _, err = Execute("_", []string{"sleep", "10"}, nil, nil, "", time.Millisecond, nil) assertNotNil(err) assertLen(res, 0) @@ -56,7 +56,7 @@ func TestExecute(t *testing.T) { } // Read from stdin - res, _, err = Execute("_", []string{"jq", "-cR", `{id: .}`}, nil, nil, "bar", time.Minute) + res, _, err = Execute("_", []string{"jq", "-cR", `{id: .}`}, nil, nil, "bar", time.Minute, nil) assertNil(err) assertLen(res, 1) if res[0].Id != "bar" { @@ -64,7 +64,7 @@ func TestExecute(t *testing.T) { } // Set env - res, _, err = Execute("_", []string{"jq", "-cn", `{id: env.HELLO}`}, []string{"HELLO=baz"}, nil, "", time.Minute) + res, _, err = Execute("_", []string{"jq", "-cn", `{id: env.HELLO}`}, []string{"HELLO=baz"}, nil, "", time.Minute, nil) assertNil(err) assertLen(res, 1) if res[0].Id != "baz" { @@ -136,7 +136,7 @@ func TestExecute(t *testing.T) { // Read state argv := []string{"sh", "-c", `cat $STATE_PATH | jq -cR '{id: "greeting", title: .} | .title = "Hello " + .title'`} - res, _, err = Execute("_", argv, nil, []byte("world"), "", time.Minute) + res, _, err = Execute("_", argv, nil, []byte("world"), "", time.Minute, nil) assertNil(err) assertLen(res, 1) if res[0].Title != "Hello world" { @@ -145,10 +145,22 @@ func TestExecute(t *testing.T) { // Write state argv = []string{"sh", "-c", `printf "Hello world" > $STATE_PATH; jq -cn '{id: "test"}'`} - res, newState, err := Execute("_", argv, nil, nil, "", time.Minute) + res, newState, err := Execute("_", argv, nil, nil, "", time.Minute, nil) assertNil(err) assertLen(res, 1) if string(newState) != "Hello world" { t.Fatalf("expected 'Hello world' from write state, got %s", string(newState)) } + + // Postprocessing function + argv = []string{"jq", "-cn", `{id: "foo"}`} + res, _, err = Execute("_", argv, nil, nil, "", time.Minute, func(item Item) Item { + item.Ttl = 123456 + return item + }) + assertNil(err) + assertLen(res, 1) + if res[0].Ttl != 123456 { + t.Fatalf("expected ttl to be set to 123456, got %d", res[0].Ttl) + } } diff --git a/core/source.go b/core/source.go index 2c0f96a..a738d4c 100644 --- a/core/source.go +++ b/core/source.go @@ -1,6 +1,8 @@ package core import ( + "database/sql" + "errors" "fmt" "log" "time" @@ -68,6 +70,50 @@ func SetState(db DB, source string, state []byte) error { return err } +func getSourceTtx(db DB, source string, env string) (int, error) { + row := db.QueryRow(` + select value + from envs + where source = ? + and name = ? + `, source, env) + var ttx int + if err := row.Scan(&ttx); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return 0, nil + } + return 0, err + } + return ttx, nil +} + +func GetSourcePostProcessor(db DB, source string) (func(item Item) Item, error) { + ttl, err := getSourceTtx(db, source, "INTAKE_TTL") + if err != nil { + return nil, err + } + ttd, err := getSourceTtx(db, source, "INTAKE_TTD") + if err != nil { + return nil, err + } + tts, err := getSourceTtx(db, source, "INTAKE_TTS") + if err != nil { + return nil, err + } + return func(item Item) Item { + if ttl != 0 { + item.Ttl = ttl + } + if ttd != 0 { + item.Ttd = ttd + } + if tts != 0 { + item.Tts = tts + } + return item + }, nil +} + // Given the results of a fetch, add new items, update existing items, and delete expired items. // // Returns the number of new and deleted items on success. @@ -137,6 +183,11 @@ func updateWithFetchedItemsTx( return 0, 0, fmt.Errorf("failed to get envs for %s: %v", source, err) } + postProcess, err := GetSourcePostProcessor(db, source) + if err != nil { + return 0, 0, fmt.Errorf("failed to get post-processor for %s: %v", source, err) + } + // If the source has an on-create trigger, run it for each new item // On-create errors are ignored to avoid failing the fetch onCreateArgv, err := GetArgvForAction(db, source, "on_create") @@ -144,7 +195,7 @@ func updateWithFetchedItemsTx( var updatedNewItems []Item for _, item := range newItems { var updatedItem Item - updatedItem, state, err = ExecuteItemAction(item, onCreateArgv, envs, state, time.Minute) + updatedItem, state, err = ExecuteItemAction(item, onCreateArgv, envs, state, time.Minute, postProcess) if err != nil { log.Printf("error: on_create failed for %s/%s: %v", item.Source, item.Id, err) } diff --git a/core/source_test.go b/core/source_test.go index 9d83695..9d30e83 100644 --- a/core/source_test.go +++ b/core/source_test.go @@ -147,7 +147,7 @@ func TestOnCreateAction(t *testing.T) { execute := func(argv []string) []Item { t.Helper() - items, _, err := Execute("test", argv, nil, nil, "", time.Minute) + items, _, err := Execute("test", argv, nil, nil, "", time.Minute, nil) if err != nil { t.Fatal("unexpected error executing test fetch") } @@ -299,3 +299,26 @@ func TestSourceState(t *testing.T) { t.Fatalf("expected hello, world, got %s", state) } } + +func TestSourceTtx(t *testing.T) { + db := EphemeralDb(t) + if err := AddSource(db, "s"); err != nil { + t.Fatal(err) + } + if err := SetEnvs(db, "s", []string{ + "INTAKE_TTL=30", + "INTAKE_TTD=60", + "INTAKE_TTS=90", + }); err != nil { + t.Fatal(err) + } + postProcess, err := GetSourcePostProcessor(db, "s") + if err != nil { + t.Fatal(err) + } + before := Item{Source: "s", Id: "i"} + after := postProcess(before) + if after.Ttl != 30 || after.Ttd != 60 || after.Tts != 90 { + t.Fatalf("Missing value after postProcess: ttl = %d, ttd = %d, tts = %d", after.Ttl, after.Ttd, after.Tts) + } +} diff --git a/web/item.go b/web/item.go index 6c415de..82cbd7f 100644 --- a/web/item.go +++ b/web/item.go @@ -72,7 +72,12 @@ func (env *Env) doAction(writer http.ResponseWriter, req *http.Request) { return } - newItem, newState, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute) + postProcess, err := core.GetSourcePostProcessor(env.db, source) + if err != nil { + log.Fatalf("error: failed to get source post-processor: %v", err) + } + + newItem, newState, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute, postProcess) if err != nil { http.Error(writer, err.Error(), 500) return