intake/core/source.go

187 lines
4.2 KiB
Go

package core
import (
"fmt"
"log"
"time"
_ "github.com/mattn/go-sqlite3"
)
func AddSource(db DB, name string) error {
_, err := db.Exec(`
insert into sources (name)
values (?)
`, name)
return err
}
func GetSources(db DB) ([]string, error) {
rows, err := db.Query(`
select name
from sources
`)
if err != nil {
return nil, err
}
defer rows.Close()
var names []string
for rows.Next() {
var name string
if err = rows.Scan(&name); err != nil {
return nil, err
}
names = append(names, name)
}
if err := rows.Err(); err != nil {
return nil, err
}
return names, nil
}
func SourceExists(db DB, source string) (bool, error) {
row := db.QueryRow("select count(*) from sources where name = ?", source)
var c int
err := row.Scan(&c)
return c > 0, err
}
func DeleteSource(db DB, name string) error {
_, err := db.Exec(`
delete from sources
where name = ?
`, name)
return err
}
func GetState(db DB, source string) ([]byte, error) {
row := db.QueryRow("select state from sources where name = ?", source)
var state []byte
err := row.Scan(&state)
return state, err
}
func SetState(db DB, source string, state []byte) error {
_, err := db.Exec("update sources set state = ? where name = ?", state, source)
return err
}
// Given the results of a fetch, add new items, update existing items, and delete expired items.
//
// Returns the number of new and deleted items on success.
func UpdateWithFetchedItems(
db DB,
source string,
state []byte,
items []Item,
now time.Time,
) (int, int, error) {
var new int
var del int
var err error
err = db.Transact(func(tx DB) error {
new, del, err = updateWithFetchedItemsTx(tx, source, state, items, now)
return err
})
return new, del, err
}
// Implementation logic for [UpdateWithFetchedItems], which executes this inside a transaction.
func updateWithFetchedItemsTx(
db DB,
source string,
state []byte,
items []Item,
now time.Time,
) (int, int, error) {
// Get the existing items
existingItems, err := GetAllItemsForSource(db, source)
if err != nil {
return 0, 0, err
}
existingIds := map[string]bool{}
existingItemsById := map[string]*Item{}
for _, item := range existingItems {
existingIds[item.Id] = true
existingItemsById[item.Id] = &item
}
// Split the fetch into adds and updates
var newItems []Item
var updatedItems []Item
for _, item := range items {
if existingIds[item.Id] {
updatedItems = append(updatedItems, item)
} else {
newItems = append(newItems, item)
}
}
// Bulk insert the new items
if err = AddItems(db, newItems); err != nil {
return 0, 0, err
}
// Bulk update the existing items
for _, item := range updatedItems {
BackfillItem(&item, existingItemsById[item.Id])
}
if err = UpdateItems(db, updatedItems); err != nil {
return 0, 0, err
}
envs, err := GetEnvs(db, source)
if err != nil {
return 0, 0, fmt.Errorf("failed to get envs for %s: %v", source, err)
}
// If the source has an on-create trigger, run it for each new item
// On-create errors are ignored to avoid failing the fetch
onCreateArgv, err := GetArgvForAction(db, source, "on_create")
if err == nil && len(onCreateArgv) > 0 {
var updatedNewItems []Item
for _, item := range newItems {
var updatedItem Item
updatedItem, state, err = ExecuteItemAction(item, onCreateArgv, envs, state, time.Minute)
if err != nil {
log.Printf("error: on_create failed for %s/%s: %v", item.Source, item.Id, err)
}
updatedNewItems = append(updatedNewItems, updatedItem)
}
UpdateItems(db, updatedNewItems)
}
// Get the list of expired items
fetchedIds := map[string]bool{}
for _, item := range items {
fetchedIds[item.Id] = true
}
expiredIds := map[string]bool{}
for id := range existingIds {
expiredIds[id] = !fetchedIds[id]
}
// Check expired items for deletion
idsToDelete := map[string]bool{}
for _, item := range existingItems {
if expiredIds[item.Id] && item.Deletable(now) {
idsToDelete[item.Id] = true
}
}
// Delete each item to be deleted
for id := range idsToDelete {
if _, err = DeleteItem(db, source, id); err != nil {
return 0, 0, err
}
}
if err = SetState(db, source, state); err != nil {
return 0, 0, err
}
return len(newItems), len(idsToDelete), nil
}