Support tt{l,d} in source updates
This commit is contained in:
parent
d6a49593b7
commit
f7dd694b67
@ -74,7 +74,7 @@ func sourceFetch(source string, format string, dryRun bool) {
|
||||
return
|
||||
}
|
||||
|
||||
added, deleted, err := core.UpdateWithFetchedItems(db, source, newState, items)
|
||||
added, deleted, err := core.UpdateWithFetchedItems(db, source, newState, items, time.Now())
|
||||
if err != nil {
|
||||
log.Fatalf("error: failed to update: %v", err)
|
||||
}
|
||||
|
21
core/item.go
21
core/item.go
@ -5,6 +5,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Actions map[string]json.RawMessage
|
||||
@ -33,8 +34,26 @@ type Item struct {
|
||||
Action Actions `json:"action"`
|
||||
}
|
||||
|
||||
func (item Item) TtlTime() time.Time {
|
||||
return time.Unix(int64(item.Created)+int64(item.Ttl), 0)
|
||||
}
|
||||
|
||||
func (item Item) TtdTime() time.Time {
|
||||
return time.Unix(int64(item.Created)+int64(item.Ttd), 0)
|
||||
}
|
||||
|
||||
func (item Item) TtsTime() time.Time {
|
||||
return time.Unix(int64(item.Created)+int64(item.Tts), 0)
|
||||
}
|
||||
|
||||
// Whether an item that no longer appears in a fetch can be deleted.
|
||||
func (item Item) Deletable() bool {
|
||||
func (item Item) Deletable(now time.Time) bool {
|
||||
if item.Ttl != 0 && item.TtlTime().After(now) {
|
||||
return false
|
||||
}
|
||||
if item.Ttd != 0 && item.TtdTime().Before(now) {
|
||||
return true
|
||||
}
|
||||
return !item.Active
|
||||
}
|
||||
|
||||
|
@ -3,6 +3,7 @@ package core
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestItemFormatsExist(t *testing.T) {
|
||||
@ -49,3 +50,35 @@ func TestItemRoundTrip(t *testing.T) {
|
||||
t.Fatalf("items are not equal, err %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestItemLifecycleTimes(t *testing.T) {
|
||||
db := EphemeralDb(t)
|
||||
if err := AddSource(db, "_"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
|
||||
// active item with expired ttd is deleted
|
||||
add, del, err := UpdateWithFetchedItems(db, "_", nil, []Item{{Source: "_", Id: "_", Ttd: 1}}, now)
|
||||
if add != 1 || del != 0 || err != nil {
|
||||
t.Fatalf("unexpected update result: %d %d %v", add, del, err)
|
||||
}
|
||||
add, del, err = UpdateWithFetchedItems(db, "_", nil, []Item{}, now.Add(time.Second*5))
|
||||
if add != 0 || del != 1 || err != nil {
|
||||
t.Fatalf("unexpected update result: %d %d %v", add, del, err)
|
||||
}
|
||||
|
||||
// inactive item with live ttl is kept
|
||||
add, del, err = UpdateWithFetchedItems(db, "_", nil, []Item{{Source: "_", Id: "_", Ttl: 60}}, now)
|
||||
if add != 1 || del != 0 || err != nil {
|
||||
t.Fatalf("unexpected update result: %d %d %v", add, del, err)
|
||||
}
|
||||
if _, err = DeactivateItem(db, "_", "_"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
add, del, err = UpdateWithFetchedItems(db, "_", nil, []Item{}, now.Add(time.Second*5))
|
||||
if add != 0 || del != 0 || err != nil {
|
||||
t.Fatalf("unexpected update result: %d %d %v", add, del, err)
|
||||
}
|
||||
}
|
||||
|
@ -71,19 +71,31 @@ func SetState(db DB, source string, state []byte) error {
|
||||
// Given the results of a fetch, add new items, update existing items, and delete expired items.
|
||||
//
|
||||
// Returns the number of new and deleted items on success.
|
||||
func UpdateWithFetchedItems(db DB, source string, state []byte, items []Item) (int, int, error) {
|
||||
func UpdateWithFetchedItems(
|
||||
db DB,
|
||||
source string,
|
||||
state []byte,
|
||||
items []Item,
|
||||
now time.Time,
|
||||
) (int, int, error) {
|
||||
var new int
|
||||
var del int
|
||||
var err error
|
||||
err = db.Transact(func(tx DB) error {
|
||||
new, del, err = updateWithFetchedItemsTx(tx, source, state, items)
|
||||
new, del, err = updateWithFetchedItemsTx(tx, source, state, items, now)
|
||||
return err
|
||||
})
|
||||
return new, del, err
|
||||
}
|
||||
|
||||
// Implementation logic for [UpdateWithFetchedItems], which executes this inside a transaction.
|
||||
func updateWithFetchedItemsTx(db DB, source string, state []byte, items []Item) (int, int, error) {
|
||||
func updateWithFetchedItemsTx(
|
||||
db DB,
|
||||
source string,
|
||||
state []byte,
|
||||
items []Item,
|
||||
now time.Time,
|
||||
) (int, int, error) {
|
||||
// Get the existing items
|
||||
existingItems, err := GetAllItemsForSource(db, source)
|
||||
if err != nil {
|
||||
@ -154,7 +166,7 @@ func updateWithFetchedItemsTx(db DB, source string, state []byte, items []Item)
|
||||
// Check expired items for deletion
|
||||
idsToDelete := map[string]bool{}
|
||||
for _, item := range existingItems {
|
||||
if expiredIds[item.Id] && item.Deletable() {
|
||||
if expiredIds[item.Id] && item.Deletable(now) {
|
||||
idsToDelete[item.Id] = true
|
||||
}
|
||||
}
|
||||
|
@ -50,20 +50,24 @@ func TestUpdateSourceAddAndDelete(t *testing.T) {
|
||||
if err := AddSource(db, "test"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
update := func(items []Item) (int, int, error) {
|
||||
t.Helper()
|
||||
return UpdateWithFetchedItems(db, "test", nil, items, time.Now())
|
||||
}
|
||||
|
||||
a := Item{Source: "test", Id: "a"}
|
||||
add, del, err := UpdateWithFetchedItems(db, "test", nil, []Item{a})
|
||||
add, del, err := update([]Item{a})
|
||||
if add != 1 || del != 0 || err != nil {
|
||||
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
|
||||
}
|
||||
|
||||
add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a})
|
||||
add, del, err = update([]Item{a})
|
||||
if add != 0 || del != 0 || err != nil {
|
||||
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
|
||||
}
|
||||
|
||||
b := Item{Source: "test", Id: "b"}
|
||||
add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a, b})
|
||||
add, del, err = update([]Item{a, b})
|
||||
if add != 1 || del != 0 || err != nil {
|
||||
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
|
||||
}
|
||||
@ -71,17 +75,17 @@ func TestUpdateSourceAddAndDelete(t *testing.T) {
|
||||
if _, err = DeactivateItem(db, "test", "a"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{a, b})
|
||||
add, del, err = update([]Item{a, b})
|
||||
if add != 0 || del != 0 || err != nil {
|
||||
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
|
||||
}
|
||||
|
||||
add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{b})
|
||||
add, del, err = update([]Item{b})
|
||||
if add != 0 || del != 1 || err != nil {
|
||||
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
|
||||
}
|
||||
|
||||
add, del, err = UpdateWithFetchedItems(db, "test", nil, []Item{b})
|
||||
add, del, err = update([]Item{b})
|
||||
if add != 0 || del != 0 || err != nil {
|
||||
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
|
||||
}
|
||||
@ -97,7 +101,7 @@ func TestUpdateSourceTransaction(t *testing.T) {
|
||||
b := Item{Source: "s", Id: "b"}
|
||||
|
||||
// Add and deactivate a so it will be deleted on next fetch without it
|
||||
if add, _, err := UpdateWithFetchedItems(db, "s", nil, []Item{a}); add != 1 || err != nil {
|
||||
if add, _, err := UpdateWithFetchedItems(db, "s", nil, []Item{a}, time.Now()); add != 1 || err != nil {
|
||||
t.Fatalf("expected 1 add, got %d and err %v", add, err)
|
||||
}
|
||||
if _, err := DeactivateItem(db, "s", "a"); err != nil {
|
||||
@ -114,7 +118,7 @@ func TestUpdateSourceTransaction(t *testing.T) {
|
||||
return nil
|
||||
},
|
||||
}
|
||||
add, del, err := UpdateWithFetchedItems(fdb, "s", nil, []Item{b})
|
||||
add, del, err := UpdateWithFetchedItems(fdb, "s", nil, []Item{b}, time.Now())
|
||||
if add != 0 || del != 0 || err == nil {
|
||||
t.Fatalf("expected failure, got %d %d %v", add, del, err)
|
||||
}
|
||||
@ -153,6 +157,11 @@ func TestOnCreateAction(t *testing.T) {
|
||||
return items
|
||||
}
|
||||
|
||||
update := func(items []Item) (int, int, error) {
|
||||
t.Helper()
|
||||
return UpdateWithFetchedItems(db, "test", nil, items, time.Now())
|
||||
}
|
||||
|
||||
onCreate := func(argv []string) {
|
||||
t.Helper()
|
||||
if err := UpdateAction(db, "test", "on_create", argv); err != nil {
|
||||
@ -172,7 +181,7 @@ func TestOnCreateAction(t *testing.T) {
|
||||
// Noop on_create works
|
||||
onCreate([]string{"tee"})
|
||||
items := execute([]string{"jq", "-cn", `{id: "one"}`})
|
||||
add, _, err := UpdateWithFetchedItems(db, "test", nil, items)
|
||||
add, _, err := update(items)
|
||||
if add != 1 || err != nil {
|
||||
t.Fatal("failed update with noop oncreate")
|
||||
}
|
||||
@ -188,7 +197,7 @@ func TestOnCreateAction(t *testing.T) {
|
||||
if items[0].Title != "Hello, World" {
|
||||
t.Fatal("unexpected title")
|
||||
}
|
||||
add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
|
||||
add, _, err = update(items)
|
||||
if add != 1 || err != nil {
|
||||
t.Fatal("failed update with alter oncreate")
|
||||
}
|
||||
@ -203,7 +212,7 @@ func TestOnCreateAction(t *testing.T) {
|
||||
if items[0].Link != "" {
|
||||
t.Fatal("unexpected link")
|
||||
}
|
||||
add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
|
||||
add, _, err = update(items)
|
||||
if add != 1 || err != nil {
|
||||
t.Fatal("failed update with augment oncreate")
|
||||
}
|
||||
@ -218,7 +227,7 @@ func TestOnCreateAction(t *testing.T) {
|
||||
if items[0].Link != "gopher://go.dev" {
|
||||
t.Fatal("missing link")
|
||||
}
|
||||
add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
|
||||
add, _, err = update(items)
|
||||
if add != 1 || err != nil {
|
||||
t.Fatal("failed update with attempted deletion oncreate")
|
||||
}
|
||||
@ -229,7 +238,7 @@ func TestOnCreateAction(t *testing.T) {
|
||||
// item is created if on_create fails
|
||||
onCreate([]string{"false"})
|
||||
items = execute([]string{"jq", "-cn", `{id: "five"}`})
|
||||
add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
|
||||
add, _, err = update(items)
|
||||
if add != 1 || err != nil {
|
||||
t.Fatal("failed update with failing oncreate")
|
||||
}
|
||||
@ -243,7 +252,7 @@ func TestOnCreateAction(t *testing.T) {
|
||||
if items[0].Title != "before" {
|
||||
t.Fatal("unexpected title")
|
||||
}
|
||||
add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
|
||||
add, _, err = update(items)
|
||||
if add != 1 || err != nil {
|
||||
t.Fatal("failed update with bad exit code oncreate")
|
||||
}
|
||||
@ -254,7 +263,7 @@ func TestOnCreateAction(t *testing.T) {
|
||||
// on_create can't change id, active, or created
|
||||
onCreate([]string{"jq", "-c", `.id = "seven"; .active = false; .created = 123456`})
|
||||
items = execute([]string{"jq", "-cn", `{id: "seven"}`})
|
||||
add, _, err = UpdateWithFetchedItems(db, "test", nil, items)
|
||||
add, _, err = update(items)
|
||||
if add != 1 || err != nil {
|
||||
t.Fatal("failed update with invalid field changes oncreate")
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user