diff --git a/README.md b/README.md index 0fbe59e..7abf367 100644 --- a/README.md +++ b/README.md @@ -138,7 +138,7 @@ Instead, the web interface can be locked behind a password set via `intake passw * [ ] TUI feed view * [ ] Escape HTML in logs saved to error items * [ ] Mark which sources have INTAKE_FETCH and which don't -* [ ] Support fetch specs as batch specs so you can e.g. batch to a dow +* [x] Support fetch specs as batch specs so you can e.g. batch to a dow * [ ] Allow desktop-only sources that are invisible on mobile * [ ] Items supporting a nonexistent action should gray out the action button * [ ] Mark a source as expected to produce items so it errors if it produces 0 (INTAKE_MIN) diff --git a/core/source.go b/core/source.go index 1a6e7f9..78395bb 100644 --- a/core/source.go +++ b/core/source.go @@ -134,9 +134,9 @@ func getSourceBatcher(db DB, source string) (func(createdTime time.Time) (tts in return nil, fmt.Errorf("failed to get batch spec: %v", err) } - var hour, minute int - if _, err := fmt.Sscanf(batchSpec, "%d:%d", &hour, &minute); err != nil { - return nil, fmt.Errorf("failed to parse batch spec: %v", err) + spec, err := ParseCronSpec(batchSpec) + if err != nil { + return nil, fmt.Errorf("failed to parse batch spec: %w", err) } return func(createdTime time.Time) (tts int) { @@ -144,10 +144,7 @@ func getSourceBatcher(db DB, source string) (func(createdTime time.Time) (tts in // the latter isn't populated for new items when postprocessing occurs. // Since postprocessing is only applied to new items post-fetch, this is close enough. // Ideally: createdTime := time.Unix(int64(created), 0).UTC() - batchCutoff := time.Date(createdTime.Year(), createdTime.Month(), createdTime.Day(), hour, minute, 0, 0, time.UTC) - if createdTime.After(batchCutoff) { - batchCutoff = batchCutoff.AddDate(0, 0, 1) - } + batchCutoff := spec.GetNextTime(createdTime) tts = int(batchCutoff.Sub(createdTime).Seconds()) return }, nil diff --git a/core/source_test.go b/core/source_test.go index f959154..eb9a39a 100644 --- a/core/source_test.go +++ b/core/source_test.go @@ -351,7 +351,7 @@ func TestSourceUpdateAppliesPostProcess(t *testing.T) { } } -func TestSourceBatching(t *testing.T) { +func TestSourceBatchSetsTts(t *testing.T) { db := EphemeralDb(t) if err := AddSource(db, "s"); err != nil { t.Fatal(err) @@ -363,18 +363,21 @@ func TestSourceBatching(t *testing.T) { } now := time.Now().Add(-1 * time.Second) + // Create item 1, which should get a tts from the batching item1 := Item{Source: "s", Id: "i"} add, del, err := UpdateWithFetchedItems(db, "s", nil, []Item{item1}, now) if add != 1 || del != 0 || err != nil { t.Fatalf("expected 1 add, got %d and err %v", add, err) } + // Create item 2 one second later, giving it a different tts item2 := Item{Source: "s", Id: "j"} add, del, err = UpdateWithFetchedItems(db, "s", nil, []Item{item2}, now.Add(time.Second)) if add != 1 || del != 0 || err != nil { t.Fatalf("expected 1 add, got %d and err %v", add, err) } + // item 2 should have a tts one second shorter than item 1 so they show at the same time items, err := GetAllItemsForSource(db, "s", 0, 100) if err != nil { t.Fatal(err) @@ -385,6 +388,7 @@ func TestSourceBatching(t *testing.T) { t.Fatalf("expected different tts based on batch time, for %d and %d", item1.Tts, item2.Tts) } + // Update item 2 ten seconds later add, del, err = UpdateWithFetchedItems(db, "s", nil, []Item{item2}, now.Add(10*time.Second)) if add != 0 || del != 0 || err != nil { t.Fatalf("expected only updates: %v", err) @@ -397,11 +401,142 @@ func TestSourceBatching(t *testing.T) { if itemsAgain[1].Id != item2.Id { t.Error("unexpected return order") } + // The previous tts should remain instead of tts being reset if itemsAgain[1].Tts != item2.Tts { t.Fatalf("batch tts not backfilled: %d vs %d", item2.Tts, itemsAgain[0].Tts) } } +func TestSourceBatchToEverySpec(t *testing.T) { + db := EphemeralDb(t) + if err := AddSource(db, "s"); err != nil { + t.Fatal(err) + } + if err := SetEnvs(db, "s", []string{ + "INTAKE_BATCH=every 5m", + }); err != nil { + t.Fatal(err) + } + + now := time.Now().UTC().Truncate(time.Hour) // now is a multiple of 5m + + // Add an item two minutes into the 5m batch + item := Item{Source: "s", Id: "i"} + add, del, err := UpdateWithFetchedItems(db, "s", nil, []Item{item}, now.Add(2*time.Minute)) + if add != 1 || del != 0 || err != nil { + t.Fatalf("expected 1 add, got %d and err %v", add, err) + } + + // The tts should be 3 minutes to align to the 5m batch + items, err := GetAllItemsForSource(db, "s", 0, 100) + if err != nil { + t.Fatal(err) + } + if len(items) != 1 { + t.Fatalf("expected 1 item, got %d", len(items)) + } + expectedTts := int(3 * time.Minute.Seconds()) + if items[0].Tts != expectedTts { + t.Fatalf("expected tts to be %d, got %d", expectedTts, items[0].Tts) + } +} + +func TestSourceBatchToAtSpec(t *testing.T) { + db := EphemeralDb(t) + if err := AddSource(db, "s"); err != nil { + t.Fatal(err) + } + if err := SetEnvs(db, "s", []string{ + "INTAKE_BATCH=at 01:00,02:00", + }); err != nil { + t.Fatal(err) + } + + tests := []struct { + time time.Time + expected int + }{ + // 00:30 batches to 01:00, 30m + {time.Date(2023, 1, 1, 0, 30, 0, 0, time.UTC), int(30 * time.Minute.Seconds())}, + // 01:15 batches to 02:00, 45m + {time.Date(2023, 1, 1, 1, 15, 0, 0, time.UTC), int(45 * time.Minute.Seconds())}, + // 02:15 batches to 01:00 the following day, 22h45m + {time.Date(2023, 1, 1, 2, 15, 0, 0, time.UTC), int((22*time.Hour + 45*time.Minute).Seconds())}, + } + + for _, test := range tests { + item := Item{Source: "s", Id: test.time.Format("150405")} + add, del, err := UpdateWithFetchedItems(db, "s", nil, []Item{item}, test.time) + if add != 1 || del != 0 || err != nil { + t.Fatalf("expected 1 add, got %d and err %v", add, err) + } + + items, err := GetAllItemsForSource(db, "s", 0, 100) + if err != nil { + t.Fatal(err) + } + if len(items) != 1 { + t.Fatalf("expected 1 item, got %d", len(items)) + } + if items[0].Tts != test.expected { + t.Fatalf("expected tts to be %d, got %d", test.expected, items[0].Tts) + } + + // Clean up for the next test case + if _, err := DeleteItem(db, "s", item.Id); err != nil { + t.Fatalf("failed to clean up item: %v", err) + } + } +} + +func TestSourceBatchToOnSpec(t *testing.T) { + db := EphemeralDb(t) + if err := AddSource(db, "s"); err != nil { + t.Fatal(err) + } + if err := SetEnvs(db, "s", []string{ + "INTAKE_BATCH=on Mon,Wed", + }); err != nil { + t.Fatal(err) + } + + tests := []struct { + time time.Time + expected int + }{ + // Sun 12:00 batches to Mon 00:00, 12h + {time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC), int(12 * time.Hour.Seconds())}, + // Tue 06:00 batches to Wed 00:00, 18h + {time.Date(2023, 1, 3, 6, 0, 0, 0, time.UTC), int(18 * time.Hour.Seconds())}, + // Wed 02:00 batches to Mon 00:00, 24x4 + 22 = 118h + {time.Date(2023, 1, 4, 2, 0, 0, 0, time.UTC), int((118 * time.Hour).Seconds())}, + } + + for _, test := range tests { + item := Item{Source: "s", Id: test.time.Format("150405")} + add, del, err := UpdateWithFetchedItems(db, "s", nil, []Item{item}, test.time) + if add != 1 || del != 0 || err != nil { + t.Fatalf("expected 1 add, got %d and err %v", add, err) + } + + items, err := GetAllItemsForSource(db, "s", 0, 100) + if err != nil { + t.Fatal(err) + } + if len(items) != 1 { + t.Fatalf("expected 1 item, got %d", len(items)) + } + if items[0].Tts != test.expected { + t.Fatalf("expected tts to be %d, got %d", test.expected, items[0].Tts) + } + + // Clean up for the next test case + if _, err := DeleteItem(db, "s", item.Id); err != nil { + t.Fatalf("failed to clean up item: %v", err) + } + } +} + func TestSourceLastUpdated(t *testing.T) { db := EphemeralDb(t) if err := AddSource(db, "s"); err != nil {