package core import ( "database/sql" "encoding/json" "errors" "fmt" "log" "time" _ "github.com/mattn/go-sqlite3" ) func AddSource(db *DB, name string) error { _, err := db.Exec(` insert into sources (name) values (?) `, name) return err } func GetSources(db *DB) ([]string, error) { rows, err := db.Query(` select name from sources `) if err != nil { return nil, err } var names []string for rows.Next() { var name string if err = rows.Scan(&name); err != nil { return nil, err } names = append(names, name) } return names, nil } func DeleteSource(db *DB, name string) error { _, err := db.Exec(` delete from sources where name = ? `, name) return err } func AddItems(db *DB, items []Item) error { return db.Transact(func(tx *sql.Tx) error { stmt, err := tx.Prepare(` insert into items (source, id, active, title, author, body, link, time, action) values (?, ?, ?, ?, ?, ?, ?, ?, jsonb(?)) `) if err != nil { return fmt.Errorf("failed to prepare insert: %v", err) } for _, item := range items { actions, err := json.Marshal(item.Action) if err != nil { return fmt.Errorf("failed to marshal actions for %s/%s: %v", item.Source, item.Id, err) } _, err = stmt.Exec(item.Source, item.Id, true, item.Title, item.Author, item.Body, item.Link, item.Time, actions) if err != nil { return fmt.Errorf("failed to insert %s/%s: %v", item.Source, item.Id, err) } } return nil }) } // Set fields in the new item to match the old item where the new item's fields are zero-valued. // This allows sources to omit fields and let an action set them without a later fetch overwriting // the value from the action, e.g. an on-create action archiving a page and setting the link to // point to the archive. func BackfillItem(new *Item, old *Item) { new.Active = old.Active new.Created = old.Created if new.Author == "" { new.Author = old.Author } if new.Body == "" { new.Body = old.Body } if new.Link == "" { new.Link = old.Link } if new.Time == 0 { new.Time = old.Time } if new.Title == "" { new.Title = old.Title } } func UpdateItems(db *DB, items []Item) error { return db.Transact(func(tx *sql.Tx) error { stmt, err := tx.Prepare(` update items set title = ?, author = ?, body = ?, link = ?, time = ?, action = jsonb(?) where source = ? and id = ? `) if err != nil { return err } for _, item := range items { actions, err := json.Marshal(item.Action) if err != nil { return fmt.Errorf("failed to marshal actions for %s/%s: %v", item.Source, item.Id, err) } _, err = stmt.Exec(item.Title, item.Author, item.Body, item.Link, item.Time, actions, item.Source, item.Id) if err != nil { return err } } return nil }) } // Deactivate an item, returning its previous active state. func DeactivateItem(db *DB, source string, id string) (bool, error) { row := db.QueryRow(` select active from items where source = ? and id = ? `, source, id) var active bool err := row.Scan(&active) if err != nil && errors.Is(err, sql.ErrNoRows) { return false, fmt.Errorf("item %s/%s not found", source, id) } _, err = db.Exec(` update items set active = 0 where source = ? and id = ? `, source, id) if err != nil { return false, err } return active, nil } func DeleteItem(db *DB, source string, id string) (int64, error) { res, err := db.Exec(` delete from items where source = ? and id = ? `, source, id) if err != nil { return 0, err } return res.RowsAffected() } func getItems(db *DB, query string, args ...any) ([]Item, error) { rows, err := db.Query(query, args...) if err != nil { return nil, err } var items []Item for rows.Next() { var item Item err = rows.Scan(&item.Source, &item.Id, &item.Created, &item.Active, &item.Title, &item.Author, &item.Body, &item.Link, &item.Time, &item.Action) if err != nil { return nil, err } items = append(items, item) } return items, nil } func GetItem(db *DB, source string, id string) (Item, error) { items, err := getItems(db, ` select source, id, created, active, title, author, body, link, time, json(action) from items where source = ? and id = ? `, source, id) if err != nil { return Item{}, err } if len(items) == 0 { return Item{}, fmt.Errorf("no item in %s with id %s", source, id) } return items[0], nil } func GetAllActiveItems(db *DB) ([]Item, error) { return getItems(db, ` select source, id, created, active, title, author, body, link, time, json(action) from items where active <> 0 `) } func GetAllItems(db *DB) ([]Item, error) { return getItems(db, ` select source, id, created, active, title, author, body, link, time, json(action) from items `) } func GetActiveItemsForSource(db *DB, source string) ([]Item, error) { return getItems(db, ` select source, id, created, active, title, author, body, link, time, json(action) from items where source = ? and active <> 0 `, source) } func GetAllItemsForSource(db *DB, source string) ([]Item, error) { return getItems(db, ` select source, id, created, active, title, author, body, link, time, json(action) from items where source = ? `, source) } // Given the results of a fetch, add new items, update existing items, and delete expired items. // // Returns the number of new and deleted items on success. func UpdateWithFetchedItems(db *DB, source string, items []Item) (int, int, error) { // Get the existing items existingItems, err := GetAllItemsForSource(db, source) if err != nil { return 0, 0, err } existingIds := map[string]bool{} existingItemsById := map[string]*Item{} for _, item := range existingItems { existingIds[item.Id] = true existingItemsById[item.Id] = &item } // Split the fetch into adds and updates var newItems []Item var updatedItems []Item for _, item := range items { if existingIds[item.Id] { updatedItems = append(updatedItems, item) } else { newItems = append(newItems, item) } } // Bulk insert the new items if err = AddItems(db, newItems); err != nil { return 0, 0, err } // Bulk update the existing items for _, item := range updatedItems { BackfillItem(&item, existingItemsById[item.Id]) } if err = UpdateItems(db, updatedItems); err != nil { return 0, 0, err } // If the source has an on-create trigger, run it for each new item // On-create errors are ignored to avoid failing the fetch onCreateArgv, err := GetArgvForAction(db, source, "on_create") if err == nil { var updatedNewItems []Item for _, item := range newItems { itemJson, err := json.Marshal(item) if err != nil { log.Fatalf("error: failed to serialize item: %v", err) } res, err := Execute(source, onCreateArgv, nil, string(itemJson), time.Minute) if err != nil { log.Printf("error: failed to execute on_create for %s/%s: %v", item.Source, item.Id, err) continue } if len(res) != 1 { log.Printf("error: expected on_create for %s/%s to produce exactly one item, got %d", item.Source, item.Id, len(res)) } updatedItem := res[0] BackfillItem(&updatedItem, &item) updatedNewItems = append(updatedNewItems, updatedItem) } UpdateItems(db, updatedNewItems) } // Get the list of expired items fetchedIds := map[string]bool{} for _, item := range items { fetchedIds[item.Id] = true } expiredIds := map[string]bool{} for id := range existingIds { expiredIds[id] = !fetchedIds[id] } // Check expired items for deletion idsToDelete := map[string]bool{} for _, item := range existingItems { if expiredIds[item.Id] && item.Deletable() { idsToDelete[item.Id] = true } } // Delete each item to be deleted for id := range idsToDelete { if _, err = DeleteItem(db, source, id); err != nil { return 0, 0, err } } return len(newItems), len(idsToDelete), nil }