Make item updates transactional
This commit is contained in:
parent
b769d71f6e
commit
37142229ea
@ -143,11 +143,15 @@ func actionExecute(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = core.UpdateItems(db, []core.Item{newItem}); err != nil {
|
if err = db.Transact(func(tx core.DB) error {
|
||||||
log.Fatalf("error: failed to update item: %v", err)
|
if _err := core.UpdateItems(tx, []core.Item{newItem}); err != nil {
|
||||||
}
|
return fmt.Errorf("failed to update item: %v", _err)
|
||||||
|
}
|
||||||
if err = core.SetState(db, source, newState); err != nil {
|
if _err := core.SetState(tx, source, newState); err != nil {
|
||||||
log.Fatalf("error: failed to set state for %s: %v", source, err)
|
return fmt.Errorf("failed to set state for %s: %v", source, _err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
log.Fatalf("error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -254,6 +254,18 @@ func SetState(db DB, source string, state []byte) error {
|
|||||||
//
|
//
|
||||||
// Returns the number of new and deleted items on success.
|
// Returns the number of new and deleted items on success.
|
||||||
func UpdateWithFetchedItems(db DB, source string, state []byte, items []Item) (int, int, error) {
|
func UpdateWithFetchedItems(db DB, source string, state []byte, items []Item) (int, int, error) {
|
||||||
|
var new int
|
||||||
|
var del int
|
||||||
|
var err error
|
||||||
|
err = db.Transact(func(tx DB) error {
|
||||||
|
new, del, err = updateWithFetchedItemsTx(tx, source, state, items)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
return new, del, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implementation logic for [UpdateWithFetchedItems], which executes this inside a transaction.
|
||||||
|
func updateWithFetchedItemsTx(db DB, source string, state []byte, items []Item) (int, int, error) {
|
||||||
// Get the existing items
|
// Get the existing items
|
||||||
existingItems, err := GetAllItemsForSource(db, source)
|
existingItems, err := GetAllItemsForSource(db, source)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1,8 +1,10 @@
|
|||||||
package core
|
package core
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"slices"
|
"slices"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -163,6 +165,51 @@ func TestUpdateSourceAddAndDelete(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestUpdateSourceTransaction(t *testing.T) {
|
||||||
|
db := EphemeralDb(t)
|
||||||
|
if err := AddSource(db, "s"); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
a := Item{Source: "s", Id: "a"}
|
||||||
|
b := Item{Source: "s", Id: "b"}
|
||||||
|
|
||||||
|
// Add and deactivate a so it will be deleted on next fetch without it
|
||||||
|
if add, _, err := UpdateWithFetchedItems(db, "s", nil, []Item{a}); add != 1 || err != nil {
|
||||||
|
t.Fatalf("expected 1 add, got %d and err %v", add, err)
|
||||||
|
}
|
||||||
|
if _, err := DeactivateItem(db, "s", "a"); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add b and cause a to be deleted, but the delete throws an error
|
||||||
|
fdb := &FailureDb{
|
||||||
|
db: db,
|
||||||
|
execError: func(q string, a ...any) error {
|
||||||
|
if strings.Contains(q, "delete from") {
|
||||||
|
return errors.New("no deletes!")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
add, del, err := UpdateWithFetchedItems(fdb, "s", nil, []Item{b})
|
||||||
|
if add != 0 || del != 0 || err == nil {
|
||||||
|
t.Fatalf("expected failure, got %d %d %v", add, del, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Failure should not add b
|
||||||
|
items, err := GetAllItemsForSource(db, "s")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if len(items) != 1 {
|
||||||
|
t.Fatalf("should only have one item, got %d", len((items)))
|
||||||
|
}
|
||||||
|
if items[0].Id != "a" {
|
||||||
|
t.Fatalf("expected only item to still be a, got %s", items[0].Id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestOnCreateAction(t *testing.T) {
|
func TestOnCreateAction(t *testing.T) {
|
||||||
db := EphemeralDb(t)
|
db := EphemeralDb(t)
|
||||||
if err := AddSource(db, "test"); err != nil {
|
if err := AddSource(db, "test"); err != nil {
|
||||||
|
16
web/item.go
16
web/item.go
@ -1,6 +1,7 @@
|
|||||||
package web
|
package web
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
@ -77,13 +78,16 @@ func (env *Env) doAction(writer http.ResponseWriter, req *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = core.UpdateItems(env.db, []core.Item{newItem}); err != nil {
|
if err = env.db.Transact(func(tx core.DB) error {
|
||||||
|
if _err := core.UpdateItems(tx, []core.Item{newItem}); err != nil {
|
||||||
|
return fmt.Errorf("failed to update item: %v", _err)
|
||||||
|
}
|
||||||
|
if _err := core.SetState(tx, source, newState); err != nil {
|
||||||
|
return fmt.Errorf("failed to set state for %s: %v", source, _err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
http.Error(writer, err.Error(), 500)
|
http.Error(writer, err.Error(), 500)
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = core.SetState(env.db, source, newState); err != nil {
|
|
||||||
log.Fatalf("error: failed to set state for %s: %v", source, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
html.Item(writer, html.ItemData{Item: newItem})
|
html.Item(writer, html.ItemData{Item: newItem})
|
||||||
|
Loading…
Reference in New Issue
Block a user