intake/core/source.go

293 lines
6.5 KiB
Go
Raw Normal View History

2025-01-16 22:53:04 +00:00
package core
2025-01-16 19:46:37 +00:00
import (
2025-02-05 21:21:31 +00:00
"database/sql"
"errors"
2025-01-16 21:46:30 +00:00
"fmt"
2025-01-24 23:41:52 +00:00
"log"
"time"
2025-01-16 19:46:37 +00:00
_ "github.com/mattn/go-sqlite3"
)
2025-01-31 16:44:09 +00:00
func AddSource(db DB, name string) error {
2025-01-16 21:46:30 +00:00
_, err := db.Exec(`
2025-02-19 03:50:19 +00:00
insert into sources (name, lastUpdated)
values (?, 0)
2025-01-16 21:46:30 +00:00
`, name)
return err
}
2025-01-31 16:44:09 +00:00
func GetSources(db DB) ([]string, error) {
2025-01-23 15:55:11 +00:00
rows, err := db.Query(`
select name
from sources
`)
if err != nil {
return nil, err
}
2025-02-01 00:01:47 +00:00
defer rows.Close()
2025-01-23 15:55:11 +00:00
var names []string
for rows.Next() {
var name string
if err = rows.Scan(&name); err != nil {
return nil, err
}
names = append(names, name)
}
2025-02-01 00:01:47 +00:00
if err := rows.Err(); err != nil {
return nil, err
}
2025-01-23 15:55:11 +00:00
return names, nil
}
func SourceExists(db DB, source string) (bool, error) {
row := db.QueryRow("select count(*) from sources where name = ?", source)
var c int
err := row.Scan(&c)
return c > 0, err
}
2025-01-31 16:44:09 +00:00
func DeleteSource(db DB, name string) error {
2025-01-16 21:46:30 +00:00
_, err := db.Exec(`
2025-01-23 15:55:11 +00:00
delete from sources
where name = ?
2025-01-16 21:46:30 +00:00
`, name)
return err
}
2025-01-31 16:44:09 +00:00
func GetState(db DB, source string) ([]byte, error) {
2025-01-30 23:22:57 +00:00
row := db.QueryRow("select state from sources where name = ?", source)
var state []byte
err := row.Scan(&state)
return state, err
}
2025-01-31 16:44:09 +00:00
func SetState(db DB, source string, state []byte) error {
2025-01-30 23:22:57 +00:00
_, err := db.Exec("update sources set state = ? where name = ?", state, source)
return err
}
2025-02-19 03:50:19 +00:00
func GetLastUpdated(db DB, source string) (time.Time, error) {
row := db.QueryRow("select lastUpdated from sources where name = ?", source)
var updated int
err := row.Scan(&updated)
return time.Unix(int64(updated), 0).UTC(), err
}
func BumpLastUpdated(db DB, source string, now time.Time) error {
_, err := db.Exec(`
update sources
set lastUpdated = ?
where name = ?
`, now.Unix(), source)
return err
}
2025-02-05 21:21:31 +00:00
func getSourceTtx(db DB, source string, env string) (int, error) {
row := db.QueryRow(`
select value
from envs
where source = ?
and name = ?
`, source, env)
var ttx int
if err := row.Scan(&ttx); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return 0, nil
}
return 0, err
}
return ttx, nil
}
func GetSourcePostProcessor(db DB, source string) (func(item Item) Item, error) {
ttl, err := getSourceTtx(db, source, "INTAKE_TTL")
if err != nil {
return nil, err
}
ttd, err := getSourceTtx(db, source, "INTAKE_TTD")
if err != nil {
return nil, err
}
tts, err := getSourceTtx(db, source, "INTAKE_TTS")
if err != nil {
return nil, err
}
return func(item Item) Item {
if ttl != 0 {
item.Ttl = ttl
}
if ttd != 0 {
item.Ttd = ttd
}
if tts != 0 {
item.Tts = tts
}
return item
}, nil
}
2025-02-07 23:06:05 +00:00
func GetSourceActionInputs(
db DB,
source string,
action string,
) (
state []byte,
envs []string,
argv []string,
postProcess func(Item) Item,
err error,
) {
state, err = GetState(db, source)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to load state for %s: %v", source, err)
}
envs, err = GetEnvs(db, source)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to get envs for %s: %v", source, err)
}
argv, err = GetArgvForAction(db, source, action)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to get %s action for %s: %v", action, source, err)
}
postProcess, err = GetSourcePostProcessor(db, source)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to get %s post-processor: %v", source, err)
}
return
}
2025-01-23 20:26:21 +00:00
// Given the results of a fetch, add new items, update existing items, and delete expired items.
2025-01-29 16:48:12 +00:00
//
// Returns the number of new and deleted items on success.
2025-02-05 19:38:30 +00:00
func UpdateWithFetchedItems(
db DB,
source string,
state []byte,
items []Item,
now time.Time,
) (int, int, error) {
2025-01-31 17:55:07 +00:00
var new int
var del int
var err error
err = db.Transact(func(tx DB) error {
2025-02-05 19:38:30 +00:00
new, del, err = updateWithFetchedItemsTx(tx, source, state, items, now)
2025-01-31 17:55:07 +00:00
return err
})
return new, del, err
}
// Implementation logic for [UpdateWithFetchedItems], which executes this inside a transaction.
2025-02-05 19:38:30 +00:00
func updateWithFetchedItemsTx(
db DB,
source string,
state []byte,
items []Item,
now time.Time,
) (int, int, error) {
// Get all existing items
existingItems, err := GetAllItemsForSource(db, source, 0, -1)
2025-01-23 20:26:21 +00:00
if err != nil {
return 0, 0, err
}
existingIds := map[string]bool{}
2025-01-23 22:24:16 +00:00
existingItemsById := map[string]*Item{}
2025-01-23 20:26:21 +00:00
for _, item := range existingItems {
existingIds[item.Id] = true
2025-01-23 22:24:16 +00:00
existingItemsById[item.Id] = &item
2025-01-23 20:26:21 +00:00
}
// Split the fetch into adds and updates
var newItems []Item
var updatedItems []Item
for _, item := range items {
if existingIds[item.Id] {
updatedItems = append(updatedItems, item)
} else {
newItems = append(newItems, item)
}
}
// Bulk insert the new items
if err = AddItems(db, newItems); err != nil {
return 0, 0, err
}
// Bulk update the existing items
2025-01-23 22:24:16 +00:00
for _, item := range updatedItems {
BackfillItem(&item, existingItemsById[item.Id])
}
if err = UpdateItems(db, updatedItems); err != nil {
return 0, 0, err
}
2025-01-23 20:26:21 +00:00
2025-01-31 16:00:07 +00:00
envs, err := GetEnvs(db, source)
if err != nil {
return 0, 0, fmt.Errorf("failed to get envs for %s: %v", source, err)
}
2025-02-05 21:21:31 +00:00
postProcess, err := GetSourcePostProcessor(db, source)
if err != nil {
return 0, 0, fmt.Errorf("failed to get post-processor for %s: %v", source, err)
}
2025-01-24 23:41:52 +00:00
// If the source has an on-create trigger, run it for each new item
// On-create errors are ignored to avoid failing the fetch
onCreateArgv, err := GetArgvForAction(db, source, "on_create")
2025-01-31 16:00:07 +00:00
if err == nil && len(onCreateArgv) > 0 {
2025-01-24 23:41:52 +00:00
var updatedNewItems []Item
for _, item := range newItems {
var updatedItem Item
2025-02-10 16:22:19 +00:00
var errItem Item
updatedItem, state, errItem, err = ExecuteItemAction(item, onCreateArgv, envs, state, DefaultTimeout, postProcess)
2025-01-24 23:41:52 +00:00
if err != nil {
2025-02-10 16:22:19 +00:00
AddErrorItem(db, errItem)
2025-01-30 16:27:45 +00:00
log.Printf("error: on_create failed for %s/%s: %v", item.Source, item.Id, err)
2025-01-24 23:41:52 +00:00
}
updatedNewItems = append(updatedNewItems, updatedItem)
}
UpdateItems(db, updatedNewItems)
}
2025-01-23 20:26:21 +00:00
// Get the list of expired items
fetchedIds := map[string]bool{}
for _, item := range items {
fetchedIds[item.Id] = true
}
expiredIds := map[string]bool{}
for id := range existingIds {
expiredIds[id] = !fetchedIds[id]
}
// Check expired items for deletion
idsToDelete := map[string]bool{}
for _, item := range existingItems {
2025-02-05 19:38:30 +00:00
if expiredIds[item.Id] && item.Deletable(now) {
2025-01-23 20:26:21 +00:00
idsToDelete[item.Id] = true
}
}
// Delete each item to be deleted
for id := range idsToDelete {
if _, err = DeleteItem(db, source, id); err != nil {
return 0, 0, err
}
}
if err = SetState(db, source, state); err != nil {
return 0, 0, err
}
2025-02-19 03:50:19 +00:00
if err = BumpLastUpdated(db, source, now); err != nil {
return 0, 0, err
}
2025-01-23 20:26:21 +00:00
return len(newItems), len(idsToDelete), nil
}