Support fetch spec formats for batching

This commit is contained in:
Tim Van Baak 2025-05-03 22:19:41 -07:00
parent 6898825f3c
commit ec29fbc67f
3 changed files with 141 additions and 9 deletions

View File

@ -138,7 +138,7 @@ Instead, the web interface can be locked behind a password set via `intake passw
* [ ] TUI feed view
* [ ] Escape HTML in logs saved to error items
* [ ] Mark which sources have INTAKE_FETCH and which don't
* [ ] Support fetch specs as batch specs so you can e.g. batch to a dow
* [x] Support fetch specs as batch specs so you can e.g. batch to a dow
* [ ] Allow desktop-only sources that are invisible on mobile
* [ ] Items supporting a nonexistent action should gray out the action button
* [ ] Mark a source as expected to produce items so it errors if it produces 0 (INTAKE_MIN)

View File

@ -134,9 +134,9 @@ func getSourceBatcher(db DB, source string) (func(createdTime time.Time) (tts in
return nil, fmt.Errorf("failed to get batch spec: %v", err)
}
var hour, minute int
if _, err := fmt.Sscanf(batchSpec, "%d:%d", &hour, &minute); err != nil {
return nil, fmt.Errorf("failed to parse batch spec: %v", err)
spec, err := ParseCronSpec(batchSpec)
if err != nil {
return nil, fmt.Errorf("failed to parse batch spec: %w", err)
}
return func(createdTime time.Time) (tts int) {
@ -144,10 +144,7 @@ func getSourceBatcher(db DB, source string) (func(createdTime time.Time) (tts in
// the latter isn't populated for new items when postprocessing occurs.
// Since postprocessing is only applied to new items post-fetch, this is close enough.
// Ideally: createdTime := time.Unix(int64(created), 0).UTC()
batchCutoff := time.Date(createdTime.Year(), createdTime.Month(), createdTime.Day(), hour, minute, 0, 0, time.UTC)
if createdTime.After(batchCutoff) {
batchCutoff = batchCutoff.AddDate(0, 0, 1)
}
batchCutoff := spec.GetNextTime(createdTime)
tts = int(batchCutoff.Sub(createdTime).Seconds())
return
}, nil

View File

@ -351,7 +351,7 @@ func TestSourceUpdateAppliesPostProcess(t *testing.T) {
}
}
func TestSourceBatching(t *testing.T) {
func TestSourceBatchSetsTts(t *testing.T) {
db := EphemeralDb(t)
if err := AddSource(db, "s"); err != nil {
t.Fatal(err)
@ -363,18 +363,21 @@ func TestSourceBatching(t *testing.T) {
}
now := time.Now().Add(-1 * time.Second)
// Create item 1, which should get a tts from the batching
item1 := Item{Source: "s", Id: "i"}
add, del, err := UpdateWithFetchedItems(db, "s", nil, []Item{item1}, now)
if add != 1 || del != 0 || err != nil {
t.Fatalf("expected 1 add, got %d and err %v", add, err)
}
// Create item 2 one second later, giving it a different tts
item2 := Item{Source: "s", Id: "j"}
add, del, err = UpdateWithFetchedItems(db, "s", nil, []Item{item2}, now.Add(time.Second))
if add != 1 || del != 0 || err != nil {
t.Fatalf("expected 1 add, got %d and err %v", add, err)
}
// item 2 should have a tts one second shorter than item 1 so they show at the same time
items, err := GetAllItemsForSource(db, "s", 0, 100)
if err != nil {
t.Fatal(err)
@ -385,6 +388,7 @@ func TestSourceBatching(t *testing.T) {
t.Fatalf("expected different tts based on batch time, for %d and %d", item1.Tts, item2.Tts)
}
// Update item 2 ten seconds later
add, del, err = UpdateWithFetchedItems(db, "s", nil, []Item{item2}, now.Add(10*time.Second))
if add != 0 || del != 0 || err != nil {
t.Fatalf("expected only updates: %v", err)
@ -397,11 +401,142 @@ func TestSourceBatching(t *testing.T) {
if itemsAgain[1].Id != item2.Id {
t.Error("unexpected return order")
}
// The previous tts should remain instead of tts being reset
if itemsAgain[1].Tts != item2.Tts {
t.Fatalf("batch tts not backfilled: %d vs %d", item2.Tts, itemsAgain[0].Tts)
}
}
func TestSourceBatchToEverySpec(t *testing.T) {
db := EphemeralDb(t)
if err := AddSource(db, "s"); err != nil {
t.Fatal(err)
}
if err := SetEnvs(db, "s", []string{
"INTAKE_BATCH=every 5m",
}); err != nil {
t.Fatal(err)
}
now := time.Now().UTC().Truncate(time.Hour) // now is a multiple of 5m
// Add an item two minutes into the 5m batch
item := Item{Source: "s", Id: "i"}
add, del, err := UpdateWithFetchedItems(db, "s", nil, []Item{item}, now.Add(2*time.Minute))
if add != 1 || del != 0 || err != nil {
t.Fatalf("expected 1 add, got %d and err %v", add, err)
}
// The tts should be 3 minutes to align to the 5m batch
items, err := GetAllItemsForSource(db, "s", 0, 100)
if err != nil {
t.Fatal(err)
}
if len(items) != 1 {
t.Fatalf("expected 1 item, got %d", len(items))
}
expectedTts := int(3 * time.Minute.Seconds())
if items[0].Tts != expectedTts {
t.Fatalf("expected tts to be %d, got %d", expectedTts, items[0].Tts)
}
}
func TestSourceBatchToAtSpec(t *testing.T) {
db := EphemeralDb(t)
if err := AddSource(db, "s"); err != nil {
t.Fatal(err)
}
if err := SetEnvs(db, "s", []string{
"INTAKE_BATCH=at 01:00,02:00",
}); err != nil {
t.Fatal(err)
}
tests := []struct {
time time.Time
expected int
}{
// 00:30 batches to 01:00, 30m
{time.Date(2023, 1, 1, 0, 30, 0, 0, time.UTC), int(30 * time.Minute.Seconds())},
// 01:15 batches to 02:00, 45m
{time.Date(2023, 1, 1, 1, 15, 0, 0, time.UTC), int(45 * time.Minute.Seconds())},
// 02:15 batches to 01:00 the following day, 22h45m
{time.Date(2023, 1, 1, 2, 15, 0, 0, time.UTC), int((22*time.Hour + 45*time.Minute).Seconds())},
}
for _, test := range tests {
item := Item{Source: "s", Id: test.time.Format("150405")}
add, del, err := UpdateWithFetchedItems(db, "s", nil, []Item{item}, test.time)
if add != 1 || del != 0 || err != nil {
t.Fatalf("expected 1 add, got %d and err %v", add, err)
}
items, err := GetAllItemsForSource(db, "s", 0, 100)
if err != nil {
t.Fatal(err)
}
if len(items) != 1 {
t.Fatalf("expected 1 item, got %d", len(items))
}
if items[0].Tts != test.expected {
t.Fatalf("expected tts to be %d, got %d", test.expected, items[0].Tts)
}
// Clean up for the next test case
if _, err := DeleteItem(db, "s", item.Id); err != nil {
t.Fatalf("failed to clean up item: %v", err)
}
}
}
func TestSourceBatchToOnSpec(t *testing.T) {
db := EphemeralDb(t)
if err := AddSource(db, "s"); err != nil {
t.Fatal(err)
}
if err := SetEnvs(db, "s", []string{
"INTAKE_BATCH=on Mon,Wed",
}); err != nil {
t.Fatal(err)
}
tests := []struct {
time time.Time
expected int
}{
// Sun 12:00 batches to Mon 00:00, 12h
{time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC), int(12 * time.Hour.Seconds())},
// Tue 06:00 batches to Wed 00:00, 18h
{time.Date(2023, 1, 3, 6, 0, 0, 0, time.UTC), int(18 * time.Hour.Seconds())},
// Wed 02:00 batches to Mon 00:00, 24x4 + 22 = 118h
{time.Date(2023, 1, 4, 2, 0, 0, 0, time.UTC), int((118 * time.Hour).Seconds())},
}
for _, test := range tests {
item := Item{Source: "s", Id: test.time.Format("150405")}
add, del, err := UpdateWithFetchedItems(db, "s", nil, []Item{item}, test.time)
if add != 1 || del != 0 || err != nil {
t.Fatalf("expected 1 add, got %d and err %v", add, err)
}
items, err := GetAllItemsForSource(db, "s", 0, 100)
if err != nil {
t.Fatal(err)
}
if len(items) != 1 {
t.Fatalf("expected 1 item, got %d", len(items))
}
if items[0].Tts != test.expected {
t.Fatalf("expected tts to be %d, got %d", test.expected, items[0].Tts)
}
// Clean up for the next test case
if _, err := DeleteItem(db, "s", item.Id); err != nil {
t.Fatalf("failed to clean up item: %v", err)
}
}
}
func TestSourceLastUpdated(t *testing.T) {
db := EphemeralDb(t)
if err := AddSource(db, "s"); err != nil {