Support INTAKE_TTX env settings

This commit is contained in:
Tim Van Baak 2025-02-05 13:21:31 -08:00
parent fee7c9225b
commit 2894493d34
9 changed files with 138 additions and 17 deletions

View File

@ -102,11 +102,17 @@ Any unspecified field is equivalent to the empty string, object, or 0, depending
Existing items are updated with new values when a fetch or action produces them, with some exceptions: Existing items are updated with new values when a fetch or action produces them, with some exceptions:
* Automatic fields cannot be changed. * Automatic fields cannot be changed.
* If a field's previous value is non-empty and the new value is empty, the old value is kept. * Source-level settings for `ttl`, `ttd`, or `tts` override the item's values.
* Fields cannot be updated from a non-empty value to an empty value.
If a field's previous value is non-empty and the new value is empty, the old value is kept.
### Sources ### Sources
A source is identified by its name. A minimally functional source requires a `fetch` action that returns items. A source is identified by its name.
A minimally functional source requires a `fetch` action that returns items.
TTL, TTD, and TTS can be configured at the source level by setting the environment variables `INTAKE_TTL`, `INTAKE_TTS`, or `INTAKE_TTS` to an integer value.
These values override any `ttl`, `ttd`, or `tts` value returned by a fetch or action.
### Action API ### Action API

View File

@ -109,7 +109,12 @@ func actionExecute(
log.Fatalf("error: failed to get action: %v", err) log.Fatalf("error: failed to get action: %v", err)
} }
newItem, newState, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute) postProcess, err := core.GetSourcePostProcessor(db, source)
if err != nil {
log.Fatalf("error: failed to get source post-processor: %v", err)
}
newItem, newState, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute, postProcess)
if err != nil { if err != nil {
log.Fatalf("error executing %s: %v", action, err) log.Fatalf("error executing %s: %v", action, err)
} }

View File

@ -58,7 +58,12 @@ func sourceFetch(source string, format string, dryRun bool) {
log.Fatalf("error: failed to get fetch action: %v", err) log.Fatalf("error: failed to get fetch action: %v", err)
} }
items, newState, err := core.Execute(source, argv, envs, state, "", time.Minute) postProcess, err := core.GetSourcePostProcessor(db, source)
if err != nil {
log.Fatalf("error: failed to get source post-processor: %v", err)
}
items, newState, err := core.Execute(source, argv, envs, state, "", time.Minute, postProcess)
if err != nil { if err != nil {
log.Fatalf("error: failed to execute fetch: %v", err) log.Fatalf("error: failed to execute fetch: %v", err)
} }

View File

@ -14,6 +14,9 @@ var sourceTestCmd = &cobra.Command{
Short: "Test a fetch action", Short: "Test a fetch action",
Long: fmt.Sprintf(`Execute a command as if it were a feed source's fetch action. Long: fmt.Sprintf(`Execute a command as if it were a feed source's fetch action.
Source-level configuration that is normally set via environment variable,
such as INTAKE_TTL, will not be applied by --env.
%s`, makeFormatHelpText()), %s`, makeFormatHelpText()),
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
sourceTest(stringArrayArg(cmd, "env"), stringArg(cmd, "format"), getArgv(cmd, args)) sourceTest(stringArrayArg(cmd, "env"), stringArg(cmd, "format"), getArgv(cmd, args))
@ -30,7 +33,7 @@ func init() {
func sourceTest(env []string, format string, cmd []string) { func sourceTest(env []string, format string, cmd []string) {
formatter := formatAs(format) formatter := formatAs(format)
items, state, err := core.Execute("test", cmd, env, nil, "", time.Minute) items, state, err := core.Execute("test", cmd, env, nil, "", time.Minute, nil)
log.Printf("returned %d items", len(items)) log.Printf("returned %d items", len(items))
log.Printf("wrote %d bytes of state", len(state)) log.Printf("wrote %d bytes of state", len(state))
if err != nil { if err != nil {

View File

@ -14,7 +14,13 @@ import (
"time" "time"
) )
func readStdout(stdout io.ReadCloser, source string, items chan Item, cparse chan bool) { func readStdout(
stdout io.ReadCloser,
source string,
postProcess func(item Item) Item,
items chan Item,
cparse chan bool,
) {
var item Item var item Item
parseError := false parseError := false
scanout := bufio.NewScanner(stdout) scanout := bufio.NewScanner(stdout)
@ -25,6 +31,9 @@ func readStdout(stdout io.ReadCloser, source string, items chan Item, cparse cha
log.Printf("[%s: stdout] %s\n", source, strings.TrimSpace(string(data))) log.Printf("[%s: stdout] %s\n", source, strings.TrimSpace(string(data)))
parseError = true parseError = true
} else { } else {
if postProcess != nil {
item = postProcess(item)
}
item.Active = true // These fields aren't up to item.Active = true // These fields aren't up to
item.Created = 0 // the action to set and item.Created = 0 // the action to set and
item.Source = source // shouldn't be overrideable item.Source = source // shouldn't be overrideable
@ -58,6 +67,7 @@ func Execute(
state []byte, state []byte,
input string, input string,
timeout time.Duration, timeout time.Duration,
postProcess func(item Item) Item,
) ([]Item, []byte, error) { ) ([]Item, []byte, error) {
log.Printf("executing %v", argv) log.Printf("executing %v", argv)
@ -119,7 +129,7 @@ func Execute(
// Routines handling the process i/o // Routines handling the process i/o
go writeStdin(stdin, input) go writeStdin(stdin, input)
go readStdout(stdout, source, cout, cparse) go readStdout(stdout, source, postProcess, cout, cparse)
go readStderr(stderr, source, cerr) go readStderr(stderr, source, cerr)
// Kick off the command // Kick off the command
@ -165,13 +175,14 @@ func ExecuteItemAction(
env []string, env []string,
state []byte, state []byte,
timeout time.Duration, timeout time.Duration,
postProcess func(item Item) Item,
) (Item, []byte, error) { ) (Item, []byte, error) {
itemJson, err := json.Marshal(item) itemJson, err := json.Marshal(item)
if err != nil { if err != nil {
return Item{}, nil, fmt.Errorf("failed to serialize item: %v", err) return Item{}, nil, fmt.Errorf("failed to serialize item: %v", err)
} }
res, newState, err := Execute(item.Source, argv, env, state, string(itemJson), timeout) res, newState, err := Execute(item.Source, argv, env, state, string(itemJson), timeout, postProcess)
if err != nil { if err != nil {
return Item{}, nil, fmt.Errorf("failed to execute action for %s/%s: %v", item.Source, item.Id, err) return Item{}, nil, fmt.Errorf("failed to execute action for %s/%s: %v", item.Source, item.Id, err)
} }

View File

@ -25,7 +25,7 @@ func TestExecute(t *testing.T) {
} }
} }
execute := func(argv []string) ([]Item, error) { execute := func(argv []string) ([]Item, error) {
item, _, err := Execute("_", argv, nil, nil, "", time.Minute) item, _, err := Execute("_", argv, nil, nil, "", time.Minute, nil)
return item, err return item, err
} }
@ -43,7 +43,7 @@ func TestExecute(t *testing.T) {
assertLen(res, 0) assertLen(res, 0)
// Timeout // Timeout
res, _, err = Execute("_", []string{"sleep", "10"}, nil, nil, "", time.Millisecond) res, _, err = Execute("_", []string{"sleep", "10"}, nil, nil, "", time.Millisecond, nil)
assertNotNil(err) assertNotNil(err)
assertLen(res, 0) assertLen(res, 0)
@ -56,7 +56,7 @@ func TestExecute(t *testing.T) {
} }
// Read from stdin // Read from stdin
res, _, err = Execute("_", []string{"jq", "-cR", `{id: .}`}, nil, nil, "bar", time.Minute) res, _, err = Execute("_", []string{"jq", "-cR", `{id: .}`}, nil, nil, "bar", time.Minute, nil)
assertNil(err) assertNil(err)
assertLen(res, 1) assertLen(res, 1)
if res[0].Id != "bar" { if res[0].Id != "bar" {
@ -64,7 +64,7 @@ func TestExecute(t *testing.T) {
} }
// Set env // Set env
res, _, err = Execute("_", []string{"jq", "-cn", `{id: env.HELLO}`}, []string{"HELLO=baz"}, nil, "", time.Minute) res, _, err = Execute("_", []string{"jq", "-cn", `{id: env.HELLO}`}, []string{"HELLO=baz"}, nil, "", time.Minute, nil)
assertNil(err) assertNil(err)
assertLen(res, 1) assertLen(res, 1)
if res[0].Id != "baz" { if res[0].Id != "baz" {
@ -136,7 +136,7 @@ func TestExecute(t *testing.T) {
// Read state // Read state
argv := []string{"sh", "-c", `cat $STATE_PATH | jq -cR '{id: "greeting", title: .} | .title = "Hello " + .title'`} argv := []string{"sh", "-c", `cat $STATE_PATH | jq -cR '{id: "greeting", title: .} | .title = "Hello " + .title'`}
res, _, err = Execute("_", argv, nil, []byte("world"), "", time.Minute) res, _, err = Execute("_", argv, nil, []byte("world"), "", time.Minute, nil)
assertNil(err) assertNil(err)
assertLen(res, 1) assertLen(res, 1)
if res[0].Title != "Hello world" { if res[0].Title != "Hello world" {
@ -145,10 +145,22 @@ func TestExecute(t *testing.T) {
// Write state // Write state
argv = []string{"sh", "-c", `printf "Hello world" > $STATE_PATH; jq -cn '{id: "test"}'`} argv = []string{"sh", "-c", `printf "Hello world" > $STATE_PATH; jq -cn '{id: "test"}'`}
res, newState, err := Execute("_", argv, nil, nil, "", time.Minute) res, newState, err := Execute("_", argv, nil, nil, "", time.Minute, nil)
assertNil(err) assertNil(err)
assertLen(res, 1) assertLen(res, 1)
if string(newState) != "Hello world" { if string(newState) != "Hello world" {
t.Fatalf("expected 'Hello world' from write state, got %s", string(newState)) t.Fatalf("expected 'Hello world' from write state, got %s", string(newState))
} }
// Postprocessing function
argv = []string{"jq", "-cn", `{id: "foo"}`}
res, _, err = Execute("_", argv, nil, nil, "", time.Minute, func(item Item) Item {
item.Ttl = 123456
return item
})
assertNil(err)
assertLen(res, 1)
if res[0].Ttl != 123456 {
t.Fatalf("expected ttl to be set to 123456, got %d", res[0].Ttl)
}
} }

View File

@ -1,6 +1,8 @@
package core package core
import ( import (
"database/sql"
"errors"
"fmt" "fmt"
"log" "log"
"time" "time"
@ -68,6 +70,50 @@ func SetState(db DB, source string, state []byte) error {
return err return err
} }
func getSourceTtx(db DB, source string, env string) (int, error) {
row := db.QueryRow(`
select value
from envs
where source = ?
and name = ?
`, source, env)
var ttx int
if err := row.Scan(&ttx); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return 0, nil
}
return 0, err
}
return ttx, nil
}
func GetSourcePostProcessor(db DB, source string) (func(item Item) Item, error) {
ttl, err := getSourceTtx(db, source, "INTAKE_TTL")
if err != nil {
return nil, err
}
ttd, err := getSourceTtx(db, source, "INTAKE_TTD")
if err != nil {
return nil, err
}
tts, err := getSourceTtx(db, source, "INTAKE_TTS")
if err != nil {
return nil, err
}
return func(item Item) Item {
if ttl != 0 {
item.Ttl = ttl
}
if ttd != 0 {
item.Ttd = ttd
}
if tts != 0 {
item.Tts = tts
}
return item
}, nil
}
// Given the results of a fetch, add new items, update existing items, and delete expired items. // Given the results of a fetch, add new items, update existing items, and delete expired items.
// //
// Returns the number of new and deleted items on success. // Returns the number of new and deleted items on success.
@ -137,6 +183,11 @@ func updateWithFetchedItemsTx(
return 0, 0, fmt.Errorf("failed to get envs for %s: %v", source, err) return 0, 0, fmt.Errorf("failed to get envs for %s: %v", source, err)
} }
postProcess, err := GetSourcePostProcessor(db, source)
if err != nil {
return 0, 0, fmt.Errorf("failed to get post-processor for %s: %v", source, err)
}
// If the source has an on-create trigger, run it for each new item // If the source has an on-create trigger, run it for each new item
// On-create errors are ignored to avoid failing the fetch // On-create errors are ignored to avoid failing the fetch
onCreateArgv, err := GetArgvForAction(db, source, "on_create") onCreateArgv, err := GetArgvForAction(db, source, "on_create")
@ -144,7 +195,7 @@ func updateWithFetchedItemsTx(
var updatedNewItems []Item var updatedNewItems []Item
for _, item := range newItems { for _, item := range newItems {
var updatedItem Item var updatedItem Item
updatedItem, state, err = ExecuteItemAction(item, onCreateArgv, envs, state, time.Minute) updatedItem, state, err = ExecuteItemAction(item, onCreateArgv, envs, state, time.Minute, postProcess)
if err != nil { if err != nil {
log.Printf("error: on_create failed for %s/%s: %v", item.Source, item.Id, err) log.Printf("error: on_create failed for %s/%s: %v", item.Source, item.Id, err)
} }

View File

@ -147,7 +147,7 @@ func TestOnCreateAction(t *testing.T) {
execute := func(argv []string) []Item { execute := func(argv []string) []Item {
t.Helper() t.Helper()
items, _, err := Execute("test", argv, nil, nil, "", time.Minute) items, _, err := Execute("test", argv, nil, nil, "", time.Minute, nil)
if err != nil { if err != nil {
t.Fatal("unexpected error executing test fetch") t.Fatal("unexpected error executing test fetch")
} }
@ -299,3 +299,26 @@ func TestSourceState(t *testing.T) {
t.Fatalf("expected hello, world, got %s", state) t.Fatalf("expected hello, world, got %s", state)
} }
} }
func TestSourceTtx(t *testing.T) {
db := EphemeralDb(t)
if err := AddSource(db, "s"); err != nil {
t.Fatal(err)
}
if err := SetEnvs(db, "s", []string{
"INTAKE_TTL=30",
"INTAKE_TTD=60",
"INTAKE_TTS=90",
}); err != nil {
t.Fatal(err)
}
postProcess, err := GetSourcePostProcessor(db, "s")
if err != nil {
t.Fatal(err)
}
before := Item{Source: "s", Id: "i"}
after := postProcess(before)
if after.Ttl != 30 || after.Ttd != 60 || after.Tts != 90 {
t.Fatalf("Missing value after postProcess: ttl = %d, ttd = %d, tts = %d", after.Ttl, after.Ttd, after.Tts)
}
}

View File

@ -72,7 +72,12 @@ func (env *Env) doAction(writer http.ResponseWriter, req *http.Request) {
return return
} }
newItem, newState, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute) postProcess, err := core.GetSourcePostProcessor(env.db, source)
if err != nil {
log.Fatalf("error: failed to get source post-processor: %v", err)
}
newItem, newState, err := core.ExecuteItemAction(item, argv, envs, state, time.Minute, postProcess)
if err != nil { if err != nil {
http.Error(writer, err.Error(), 500) http.Error(writer, err.Error(), 500)
return return