diff --git a/core/source.go b/core/source.go index 907373e..d4c2b5c 100644 --- a/core/source.go +++ b/core/source.go @@ -103,7 +103,43 @@ func getSourceTtx(db DB, source string, env string) (int, error) { return ttx, nil } -func GetSourcePostProcessor(db DB, source string) (func(item Item) Item, error) { +func getSourceBatcher(db DB, source string) (func(createdTime time.Time) (tts int), error) { + row := db.QueryRow(` + select value + from envs + where source = ? + and name = 'INTAKE_BATCH' + `, source) + var batchSpec string + if err := row.Scan(&batchSpec); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, fmt.Errorf("failed to get batch spec: %v", err) + } + + var hour, minute int + if _, err := fmt.Sscanf(batchSpec, "%d:%d", &hour, &minute); err != nil { + return nil, fmt.Errorf("failed to parse batch spec: %v", err) + } + + log.Printf("%s batch %d %d", source, hour, minute) + + return func(createdTime time.Time) (tts int) { + // There is a trivial inaccuracy in using time.Now() instead of item.Created, but + // the latter isn't populated for new items when postprocessing occurs. + // Since postprocessing is only applied to new items post-fetch, this is close enough. + // Ideally: createdTime := time.Unix(int64(created), 0).UTC() + batchCutoff := time.Date(createdTime.Year(), createdTime.Month(), createdTime.Day(), hour, minute, 0, 0, time.UTC) + if createdTime.After(batchCutoff) { + batchCutoff = batchCutoff.AddDate(0, 0, 1) + } + tts = int(batchCutoff.Sub(createdTime).Seconds()) + return + }, nil +} + +func GetSourcePostProcessor(db DB, source string) (func(item Item, now time.Time) Item, error) { ttl, err := getSourceTtx(db, source, "INTAKE_TTL") if err != nil { return nil, err @@ -116,14 +152,20 @@ func GetSourcePostProcessor(db DB, source string) (func(item Item) Item, error) if err != nil { return nil, err } - return func(item Item) Item { + batchTts, err := getSourceBatcher(db, source) + if err != nil { + return nil, err + } + return func(item Item, now time.Time) Item { if ttl != 0 { item.Ttl = ttl } if ttd != 0 { item.Ttd = ttd } - if tts != 0 { + if batchTts != nil { + item.Tts = batchTts(now) + } else if tts != 0 { item.Tts = tts } return item @@ -226,7 +268,7 @@ func updateWithFetchedItemsTx( // Apply post-processing to the new items if postProcess != nil { for i := range newItems { - newItems[i] = postProcess(newItems[i]) + newItems[i] = postProcess(newItems[i], now) } } diff --git a/core/source_test.go b/core/source_test.go index f395f23..bb90ba7 100644 --- a/core/source_test.go +++ b/core/source_test.go @@ -317,7 +317,7 @@ func TestSourcePostProcessor(t *testing.T) { t.Fatal(err) } before := Item{Source: "s", Id: "i"} - after := postProcess(before) + after := postProcess(before, time.Now()) if after.Ttl != 30 || after.Ttd != 60 || after.Tts != 90 { t.Fatalf("Missing value after postProcess: ttl = %d, ttd = %d, tts = %d", after.Ttl, after.Ttd, after.Tts) } @@ -351,6 +351,46 @@ func TestSourceUpdateAppliesPostProcess(t *testing.T) { } } +func TestSourceBatching(t *testing.T) { + db := EphemeralDb(t) + if err := AddSource(db, "s"); err != nil { + t.Fatal(err) + } + if err := SetEnvs(db, "s", []string{ + "INTAKE_BATCH=00:00", + }); err != nil { + t.Fatal(err) + } + now := time.Now().Add(-1 * time.Second) + + item1 := Item{Source: "s", Id: "i"} + add, del, err := UpdateWithFetchedItems(db, "s", nil, []Item{item1}, now) + if add != 1 || del != 0 || err != nil { + t.Fatalf("expected 1 add, got %d and err %v", add, err) + } + + item2 := Item{Source: "s", Id: "j"} + add, del, err = UpdateWithFetchedItems(db, "s", nil, []Item{item2}, now.Add(time.Second)) + if add != 1 || del != 0 || err != nil { + t.Fatalf("expected 1 add, got %d and err %v", add, err) + } + + items, err := GetAllItemsForSource(db, "s", 0, 100) + if err != nil { + t.Fatal(err) + } + if items[0].Id == "i" { + item1 = items[0] + item2 = items[1] + } else { + item1 = items[1] + item2 = items[0] + } + if item2.Tts != item1.Tts-1 { + t.Fatalf("expected different tts based on batch time, for %d and %d", item1.Tts, item2.Tts) + } +} + func TestSourceLastUpdated(t *testing.T) { db := EphemeralDb(t) if err := AddSource(db, "s"); err != nil { diff --git a/test/test_items.sh b/test/test_items.sh index 3b2a4c2..311397e 100755 --- a/test/test_items.sh +++ b/test/test_items.sh @@ -59,6 +59,8 @@ tmp/intake source env -s nothing --set "INTAKE_FETCH=every 5m" tmp/intake source add -s hello tmp/intake action add -s hello -a fetch -- sh -c "cat /dev/random | base32 | head -c8 | jq -cR '{id: ., title: \"Hello\"}'" tmp/intake source env -s hello --set "INTAKE_FETCH=every 1m" +# and batch +tmp/intake source env -s hello --set "INTAKE_BATCH=00:00" # default password, comment out to test no password echo "hello" | tmp/intake passwd --stdin