intake/core/source.go

335 lines
7.4 KiB
Go
Raw Normal View History

2025-01-16 22:53:04 +00:00
package core
2025-01-16 19:46:37 +00:00
import (
"database/sql"
2025-01-24 23:41:52 +00:00
"encoding/json"
2025-01-17 06:02:03 +00:00
"errors"
2025-01-16 21:46:30 +00:00
"fmt"
2025-01-24 23:41:52 +00:00
"log"
"time"
2025-01-16 19:46:37 +00:00
_ "github.com/mattn/go-sqlite3"
)
func AddSource(db *DB, name string) error {
2025-01-16 21:46:30 +00:00
_, err := db.Exec(`
insert into sources (name)
values (?)
`, name)
return err
}
2025-01-23 15:55:11 +00:00
func GetSources(db *DB) ([]string, error) {
rows, err := db.Query(`
select name
from sources
`)
if err != nil {
return nil, err
}
var names []string
for rows.Next() {
var name string
if err = rows.Scan(&name); err != nil {
return nil, err
}
names = append(names, name)
}
return names, nil
}
func DeleteSource(db *DB, name string) error {
2025-01-16 21:46:30 +00:00
_, err := db.Exec(`
2025-01-23 15:55:11 +00:00
delete from sources
where name = ?
2025-01-16 21:46:30 +00:00
`, name)
return err
}
func AddItem(
db *DB,
2025-01-16 21:46:30 +00:00
source string,
id string,
title string,
author string,
body string,
link string,
time int,
) error {
_, err := db.Exec(`
insert into items (source, id, active, title, author, body, link, time)
values (?, ?, ?, ?, ?, ?, ?, ?)
`, source, id, true, title, author, body, link, time)
return err
}
2025-01-23 20:26:21 +00:00
func AddItems(db *DB, items []Item) error {
return db.Transact(func(tx *sql.Tx) error {
stmt, err := tx.Prepare(`
insert into items (source, id, active, title, author, body, link, time)
values (?, ?, ?, ?, ?, ?, ?, ?)
`)
if err != nil {
return err
}
for _, item := range items {
_, err = stmt.Exec(item.Source, item.Id, true, item.Title, item.Author, item.Body, item.Link, item.Time)
if err != nil {
return err
}
}
return nil
})
}
2025-01-23 22:24:16 +00:00
// Set fields in the new item to match the old item where the new item's fields are zero-valued.
// This allows sources to omit fields and let an action set them without a later fetch overwriting
// the value from the action, e.g. an on-create action archiving a page and setting the link to
// point to the archive.
func BackfillItem(new *Item, old *Item) {
new.Active = old.Active
new.Created = old.Created
if new.Author == "" {
new.Author = old.Author
}
if new.Body == "" {
new.Body = old.Body
}
if new.Link == "" {
new.Link = old.Link
}
if new.Time == 0 {
new.Time = old.Time
}
if new.Title == "" {
new.Title = old.Title
}
}
func UpdateItems(db *DB, items []Item) error {
return db.Transact(func(tx *sql.Tx) error {
stmt, err := tx.Prepare(`
update items
set
title = ?,
author = ?,
body = ?,
link = ?,
time = ?
where source = ?
and id = ?
`)
if err != nil {
return err
}
for _, item := range items {
_, err = stmt.Exec(item.Title, item.Author, item.Body, item.Link, item.Time, item.Source, item.Id)
if err != nil {
return err
}
}
return nil
})
}
2025-01-17 06:02:03 +00:00
// Deactivate an item, returning its previous active state.
func DeactivateItem(db *DB, source string, id string) (bool, error) {
2025-01-17 06:02:03 +00:00
row := db.QueryRow(`
select active
from items
where source = ? and id = ?
`, source, id)
var active bool
err := row.Scan(&active)
if err != nil && errors.Is(err, sql.ErrNoRows) {
return false, fmt.Errorf("item %s/%s not found", source, id)
}
_, err = db.Exec(`
2025-01-16 21:46:30 +00:00
update items
set active = 0
where source = ? and id = ?
`, source, id)
if err != nil {
2025-01-17 06:02:03 +00:00
return false, err
2025-01-16 21:46:30 +00:00
}
2025-01-17 06:02:03 +00:00
return active, nil
2025-01-16 21:46:30 +00:00
}
func DeleteItem(db *DB, source string, id string) (int64, error) {
res, err := db.Exec(`
delete from items
where source = ?
and id = ?
`, source, id)
if err != nil {
return 0, err
}
return res.RowsAffected()
}
2025-01-23 19:34:44 +00:00
func getItems(db *DB, query string, args ...any) ([]Item, error) {
rows, err := db.Query(query, args...)
2025-01-17 15:05:57 +00:00
if err != nil {
return nil, err
}
var items []Item
for rows.Next() {
var item Item
2025-01-23 17:08:17 +00:00
err = rows.Scan(&item.Source, &item.Id, &item.Created, &item.Active, &item.Title, &item.Author, &item.Body, &item.Link, &item.Time)
if err != nil {
return nil, err
}
2025-01-17 15:05:57 +00:00
items = append(items, item)
}
return items, nil
}
2025-01-23 21:32:09 +00:00
func GetItem(db *DB, source string, id string) (Item, error) {
items, err := getItems(db, `
select source, id, created, active, title, author, body, link, time
from items
where source = ?
and id = ?
`, source, id)
if err != nil {
return Item{}, err
}
if len(items) == 0 {
return Item{}, fmt.Errorf("no item in %s with id %s", source, id)
}
return items[0], nil
}
2025-01-23 19:34:44 +00:00
func GetAllActiveItems(db *DB) ([]Item, error) {
return getItems(db, `
select
source, id, created, active, title, author, body, link, time
from items
where active <> 0
`)
}
func GetAllItems(db *DB) ([]Item, error) {
return getItems(db, `
select
source, id, created, active, title, author, body, link, time
from items
`)
}
func GetActiveItemsForSource(db *DB, source string) ([]Item, error) {
2025-01-23 19:34:44 +00:00
return getItems(db, `
2025-01-16 21:46:30 +00:00
select
source, id, created, active, title, author, body, link, time
2025-01-16 21:46:30 +00:00
from items
where
source = ?
and active <> 0
`, source)
}
func GetAllItemsForSource(db *DB, source string) ([]Item, error) {
return getItems(db, `
select
source, id, created, active, title, author, body, link, time
from items
where
source = ?
`, source)
}
2025-01-23 20:26:21 +00:00
// Given the results of a fetch, add new items, update existing items, and delete expired items.
func UpdateWithFetchedItems(db *DB, source string, items []Item) (int, int, error) {
// Get the existing items
existingItems, err := GetAllItemsForSource(db, source)
if err != nil {
return 0, 0, err
}
existingIds := map[string]bool{}
2025-01-23 22:24:16 +00:00
existingItemsById := map[string]*Item{}
2025-01-23 20:26:21 +00:00
for _, item := range existingItems {
existingIds[item.Id] = true
2025-01-23 22:24:16 +00:00
existingItemsById[item.Id] = &item
2025-01-23 20:26:21 +00:00
}
// Split the fetch into adds and updates
var newItems []Item
var updatedItems []Item
for _, item := range items {
if existingIds[item.Id] {
updatedItems = append(updatedItems, item)
} else {
newItems = append(newItems, item)
}
}
// Bulk insert the new items
if err = AddItems(db, newItems); err != nil {
return 0, 0, err
}
// Bulk update the existing items
2025-01-23 22:24:16 +00:00
for _, item := range updatedItems {
BackfillItem(&item, existingItemsById[item.Id])
}
if err = UpdateItems(db, updatedItems); err != nil {
return 0, 0, err
}
2025-01-23 20:26:21 +00:00
2025-01-24 23:41:52 +00:00
// If the source has an on-create trigger, run it for each new item
// On-create errors are ignored to avoid failing the fetch
onCreateArgv, err := GetArgvForAction(db, source, "on_create")
if err == nil {
var updatedNewItems []Item
for _, item := range newItems {
itemJson, err := json.Marshal(item)
if err != nil {
log.Fatalf("error: failed to serialize item: %v", err)
}
res, err := Execute(source, onCreateArgv, nil, string(itemJson), time.Minute)
if err != nil {
log.Printf("error: failed to execute on_create for %s/%s: %v", item.Source, item.Id, err)
continue
}
if len(res) != 1 {
log.Printf("error: expected on_create for %s/%s to produce exactly one item, got %d", item.Source, item.Id, len(res))
}
updatedItem := res[0]
BackfillItem(&updatedItem, &item)
updatedNewItems = append(updatedNewItems, updatedItem)
}
UpdateItems(db, updatedNewItems)
}
2025-01-23 20:26:21 +00:00
// Get the list of expired items
fetchedIds := map[string]bool{}
for _, item := range items {
fetchedIds[item.Id] = true
}
expiredIds := map[string]bool{}
for id := range existingIds {
expiredIds[id] = !fetchedIds[id]
}
// Check expired items for deletion
idsToDelete := map[string]bool{}
for _, item := range existingItems {
if expiredIds[item.Id] && item.Deletable() {
idsToDelete[item.Id] = true
}
}
// Delete each item to be deleted
for id := range idsToDelete {
if _, err = DeleteItem(db, source, id); err != nil {
return 0, 0, err
}
}
return len(newItems), len(idsToDelete), nil
}