intake/core/source.go

175 lines
4.1 KiB
Go
Raw Normal View History

2025-01-16 22:53:04 +00:00
package core
2025-01-16 19:46:37 +00:00
import (
2025-01-16 21:46:30 +00:00
"fmt"
2025-01-24 23:41:52 +00:00
"log"
"time"
2025-01-16 19:46:37 +00:00
_ "github.com/mattn/go-sqlite3"
)
2025-01-31 16:44:09 +00:00
func AddSource(db DB, name string) error {
2025-01-16 21:46:30 +00:00
_, err := db.Exec(`
insert into sources (name)
values (?)
`, name)
return err
}
2025-01-31 16:44:09 +00:00
func GetSources(db DB) ([]string, error) {
2025-01-23 15:55:11 +00:00
rows, err := db.Query(`
select name
from sources
`)
if err != nil {
return nil, err
}
2025-02-01 00:01:47 +00:00
defer rows.Close()
2025-01-23 15:55:11 +00:00
var names []string
for rows.Next() {
var name string
if err = rows.Scan(&name); err != nil {
return nil, err
}
names = append(names, name)
}
2025-02-01 00:01:47 +00:00
if err := rows.Err(); err != nil {
return nil, err
}
2025-01-23 15:55:11 +00:00
return names, nil
}
func SourceExists(db DB, source string) (bool, error) {
row := db.QueryRow("select count(*) from sources where name = ?", source)
var c int
err := row.Scan(&c)
return c > 0, err
}
2025-01-31 16:44:09 +00:00
func DeleteSource(db DB, name string) error {
2025-01-16 21:46:30 +00:00
_, err := db.Exec(`
2025-01-23 15:55:11 +00:00
delete from sources
where name = ?
2025-01-16 21:46:30 +00:00
`, name)
return err
}
2025-01-31 16:44:09 +00:00
func GetState(db DB, source string) ([]byte, error) {
2025-01-30 23:22:57 +00:00
row := db.QueryRow("select state from sources where name = ?", source)
var state []byte
err := row.Scan(&state)
return state, err
}
2025-01-31 16:44:09 +00:00
func SetState(db DB, source string, state []byte) error {
2025-01-30 23:22:57 +00:00
_, err := db.Exec("update sources set state = ? where name = ?", state, source)
return err
}
2025-01-23 20:26:21 +00:00
// Given the results of a fetch, add new items, update existing items, and delete expired items.
2025-01-29 16:48:12 +00:00
//
// Returns the number of new and deleted items on success.
2025-01-31 16:44:09 +00:00
func UpdateWithFetchedItems(db DB, source string, state []byte, items []Item) (int, int, error) {
2025-01-31 17:55:07 +00:00
var new int
var del int
var err error
err = db.Transact(func(tx DB) error {
new, del, err = updateWithFetchedItemsTx(tx, source, state, items)
return err
})
return new, del, err
}
// Implementation logic for [UpdateWithFetchedItems], which executes this inside a transaction.
func updateWithFetchedItemsTx(db DB, source string, state []byte, items []Item) (int, int, error) {
2025-01-23 20:26:21 +00:00
// Get the existing items
existingItems, err := GetAllItemsForSource(db, source)
if err != nil {
return 0, 0, err
}
existingIds := map[string]bool{}
2025-01-23 22:24:16 +00:00
existingItemsById := map[string]*Item{}
2025-01-23 20:26:21 +00:00
for _, item := range existingItems {
existingIds[item.Id] = true
2025-01-23 22:24:16 +00:00
existingItemsById[item.Id] = &item
2025-01-23 20:26:21 +00:00
}
// Split the fetch into adds and updates
var newItems []Item
var updatedItems []Item
for _, item := range items {
if existingIds[item.Id] {
updatedItems = append(updatedItems, item)
} else {
newItems = append(newItems, item)
}
}
// Bulk insert the new items
if err = AddItems(db, newItems); err != nil {
return 0, 0, err
}
// Bulk update the existing items
2025-01-23 22:24:16 +00:00
for _, item := range updatedItems {
BackfillItem(&item, existingItemsById[item.Id])
}
if err = UpdateItems(db, updatedItems); err != nil {
return 0, 0, err
}
2025-01-23 20:26:21 +00:00
2025-01-31 16:00:07 +00:00
envs, err := GetEnvs(db, source)
if err != nil {
return 0, 0, fmt.Errorf("failed to get envs for %s: %v", source, err)
}
2025-01-24 23:41:52 +00:00
// If the source has an on-create trigger, run it for each new item
// On-create errors are ignored to avoid failing the fetch
onCreateArgv, err := GetArgvForAction(db, source, "on_create")
2025-01-31 16:00:07 +00:00
if err == nil && len(onCreateArgv) > 0 {
2025-01-24 23:41:52 +00:00
var updatedNewItems []Item
for _, item := range newItems {
var updatedItem Item
2025-01-31 16:00:07 +00:00
updatedItem, state, err = ExecuteItemAction(item, onCreateArgv, envs, state, time.Minute)
2025-01-24 23:41:52 +00:00
if err != nil {
2025-01-30 16:27:45 +00:00
log.Printf("error: on_create failed for %s/%s: %v", item.Source, item.Id, err)
2025-01-24 23:41:52 +00:00
}
updatedNewItems = append(updatedNewItems, updatedItem)
}
UpdateItems(db, updatedNewItems)
}
2025-01-23 20:26:21 +00:00
// Get the list of expired items
fetchedIds := map[string]bool{}
for _, item := range items {
fetchedIds[item.Id] = true
}
expiredIds := map[string]bool{}
for id := range existingIds {
expiredIds[id] = !fetchedIds[id]
}
// Check expired items for deletion
idsToDelete := map[string]bool{}
for _, item := range existingItems {
if expiredIds[item.Id] && item.Deletable() {
idsToDelete[item.Id] = true
}
}
// Delete each item to be deleted
for id := range idsToDelete {
if _, err = DeleteItem(db, source, id); err != nil {
return 0, 0, err
}
}
if err = SetState(db, source, state); err != nil {
return 0, 0, err
}
2025-01-23 20:26:21 +00:00
return len(newItems), len(idsToDelete), nil
}