Implement source batcher

This commit is contained in:
Tim Van Baak 2025-02-24 09:44:26 -08:00
parent 1c8ec5ff8f
commit 61458fb9fd
3 changed files with 89 additions and 5 deletions

View File

@ -103,7 +103,43 @@ func getSourceTtx(db DB, source string, env string) (int, error) {
return ttx, nil return ttx, nil
} }
func GetSourcePostProcessor(db DB, source string) (func(item Item) Item, error) { func getSourceBatcher(db DB, source string) (func(createdTime time.Time) (tts int), error) {
row := db.QueryRow(`
select value
from envs
where source = ?
and name = 'INTAKE_BATCH'
`, source)
var batchSpec string
if err := row.Scan(&batchSpec); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
return nil, fmt.Errorf("failed to get batch spec: %v", err)
}
var hour, minute int
if _, err := fmt.Sscanf(batchSpec, "%d:%d", &hour, &minute); err != nil {
return nil, fmt.Errorf("failed to parse batch spec: %v", err)
}
log.Printf("%s batch %d %d", source, hour, minute)
return func(createdTime time.Time) (tts int) {
// There is a trivial inaccuracy in using time.Now() instead of item.Created, but
// the latter isn't populated for new items when postprocessing occurs.
// Since postprocessing is only applied to new items post-fetch, this is close enough.
// Ideally: createdTime := time.Unix(int64(created), 0).UTC()
batchCutoff := time.Date(createdTime.Year(), createdTime.Month(), createdTime.Day(), hour, minute, 0, 0, time.UTC)
if createdTime.After(batchCutoff) {
batchCutoff = batchCutoff.AddDate(0, 0, 1)
}
tts = int(batchCutoff.Sub(createdTime).Seconds())
return
}, nil
}
func GetSourcePostProcessor(db DB, source string) (func(item Item, now time.Time) Item, error) {
ttl, err := getSourceTtx(db, source, "INTAKE_TTL") ttl, err := getSourceTtx(db, source, "INTAKE_TTL")
if err != nil { if err != nil {
return nil, err return nil, err
@ -116,14 +152,20 @@ func GetSourcePostProcessor(db DB, source string) (func(item Item) Item, error)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return func(item Item) Item { batchTts, err := getSourceBatcher(db, source)
if err != nil {
return nil, err
}
return func(item Item, now time.Time) Item {
if ttl != 0 { if ttl != 0 {
item.Ttl = ttl item.Ttl = ttl
} }
if ttd != 0 { if ttd != 0 {
item.Ttd = ttd item.Ttd = ttd
} }
if tts != 0 { if batchTts != nil {
item.Tts = batchTts(now)
} else if tts != 0 {
item.Tts = tts item.Tts = tts
} }
return item return item
@ -226,7 +268,7 @@ func updateWithFetchedItemsTx(
// Apply post-processing to the new items // Apply post-processing to the new items
if postProcess != nil { if postProcess != nil {
for i := range newItems { for i := range newItems {
newItems[i] = postProcess(newItems[i]) newItems[i] = postProcess(newItems[i], now)
} }
} }

View File

@ -317,7 +317,7 @@ func TestSourcePostProcessor(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
before := Item{Source: "s", Id: "i"} before := Item{Source: "s", Id: "i"}
after := postProcess(before) after := postProcess(before, time.Now())
if after.Ttl != 30 || after.Ttd != 60 || after.Tts != 90 { if after.Ttl != 30 || after.Ttd != 60 || after.Tts != 90 {
t.Fatalf("Missing value after postProcess: ttl = %d, ttd = %d, tts = %d", after.Ttl, after.Ttd, after.Tts) t.Fatalf("Missing value after postProcess: ttl = %d, ttd = %d, tts = %d", after.Ttl, after.Ttd, after.Tts)
} }
@ -351,6 +351,46 @@ func TestSourceUpdateAppliesPostProcess(t *testing.T) {
} }
} }
func TestSourceBatching(t *testing.T) {
db := EphemeralDb(t)
if err := AddSource(db, "s"); err != nil {
t.Fatal(err)
}
if err := SetEnvs(db, "s", []string{
"INTAKE_BATCH=00:00",
}); err != nil {
t.Fatal(err)
}
now := time.Now().Add(-1 * time.Second)
item1 := Item{Source: "s", Id: "i"}
add, del, err := UpdateWithFetchedItems(db, "s", nil, []Item{item1}, now)
if add != 1 || del != 0 || err != nil {
t.Fatalf("expected 1 add, got %d and err %v", add, err)
}
item2 := Item{Source: "s", Id: "j"}
add, del, err = UpdateWithFetchedItems(db, "s", nil, []Item{item2}, now.Add(time.Second))
if add != 1 || del != 0 || err != nil {
t.Fatalf("expected 1 add, got %d and err %v", add, err)
}
items, err := GetAllItemsForSource(db, "s", 0, 100)
if err != nil {
t.Fatal(err)
}
if items[0].Id == "i" {
item1 = items[0]
item2 = items[1]
} else {
item1 = items[1]
item2 = items[0]
}
if item2.Tts != item1.Tts-1 {
t.Fatalf("expected different tts based on batch time, for %d and %d", item1.Tts, item2.Tts)
}
}
func TestSourceLastUpdated(t *testing.T) { func TestSourceLastUpdated(t *testing.T) {
db := EphemeralDb(t) db := EphemeralDb(t)
if err := AddSource(db, "s"); err != nil { if err := AddSource(db, "s"); err != nil {

View File

@ -59,6 +59,8 @@ tmp/intake source env -s nothing --set "INTAKE_FETCH=every 5m"
tmp/intake source add -s hello tmp/intake source add -s hello
tmp/intake action add -s hello -a fetch -- sh -c "cat /dev/random | base32 | head -c8 | jq -cR '{id: ., title: \"Hello\"}'" tmp/intake action add -s hello -a fetch -- sh -c "cat /dev/random | base32 | head -c8 | jq -cR '{id: ., title: \"Hello\"}'"
tmp/intake source env -s hello --set "INTAKE_FETCH=every 1m" tmp/intake source env -s hello --set "INTAKE_FETCH=every 1m"
# and batch
tmp/intake source env -s hello --set "INTAKE_BATCH=00:00"
# default password, comment out to test no password # default password, comment out to test no password
echo "hello" | tmp/intake passwd --stdin echo "hello" | tmp/intake passwd --stdin