238 lines
5.2 KiB
Go
238 lines
5.2 KiB
Go
package core
|
|
|
|
import (
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"time"
|
|
|
|
_ "github.com/mattn/go-sqlite3"
|
|
)
|
|
|
|
func AddSource(db DB, name string) error {
|
|
_, err := db.Exec(`
|
|
insert into sources (name)
|
|
values (?)
|
|
`, name)
|
|
|
|
return err
|
|
}
|
|
|
|
func GetSources(db DB) ([]string, error) {
|
|
rows, err := db.Query(`
|
|
select name
|
|
from sources
|
|
`)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var names []string
|
|
for rows.Next() {
|
|
var name string
|
|
if err = rows.Scan(&name); err != nil {
|
|
return nil, err
|
|
}
|
|
names = append(names, name)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return names, nil
|
|
}
|
|
|
|
func SourceExists(db DB, source string) (bool, error) {
|
|
row := db.QueryRow("select count(*) from sources where name = ?", source)
|
|
var c int
|
|
err := row.Scan(&c)
|
|
return c > 0, err
|
|
}
|
|
|
|
func DeleteSource(db DB, name string) error {
|
|
_, err := db.Exec(`
|
|
delete from sources
|
|
where name = ?
|
|
`, name)
|
|
|
|
return err
|
|
}
|
|
|
|
func GetState(db DB, source string) ([]byte, error) {
|
|
row := db.QueryRow("select state from sources where name = ?", source)
|
|
var state []byte
|
|
err := row.Scan(&state)
|
|
return state, err
|
|
}
|
|
|
|
func SetState(db DB, source string, state []byte) error {
|
|
_, err := db.Exec("update sources set state = ? where name = ?", state, source)
|
|
return err
|
|
}
|
|
|
|
func getSourceTtx(db DB, source string, env string) (int, error) {
|
|
row := db.QueryRow(`
|
|
select value
|
|
from envs
|
|
where source = ?
|
|
and name = ?
|
|
`, source, env)
|
|
var ttx int
|
|
if err := row.Scan(&ttx); err != nil {
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return 0, nil
|
|
}
|
|
return 0, err
|
|
}
|
|
return ttx, nil
|
|
}
|
|
|
|
func GetSourcePostProcessor(db DB, source string) (func(item Item) Item, error) {
|
|
ttl, err := getSourceTtx(db, source, "INTAKE_TTL")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ttd, err := getSourceTtx(db, source, "INTAKE_TTD")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tts, err := getSourceTtx(db, source, "INTAKE_TTS")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return func(item Item) Item {
|
|
if ttl != 0 {
|
|
item.Ttl = ttl
|
|
}
|
|
if ttd != 0 {
|
|
item.Ttd = ttd
|
|
}
|
|
if tts != 0 {
|
|
item.Tts = tts
|
|
}
|
|
return item
|
|
}, nil
|
|
}
|
|
|
|
// Given the results of a fetch, add new items, update existing items, and delete expired items.
|
|
//
|
|
// Returns the number of new and deleted items on success.
|
|
func UpdateWithFetchedItems(
|
|
db DB,
|
|
source string,
|
|
state []byte,
|
|
items []Item,
|
|
now time.Time,
|
|
) (int, int, error) {
|
|
var new int
|
|
var del int
|
|
var err error
|
|
err = db.Transact(func(tx DB) error {
|
|
new, del, err = updateWithFetchedItemsTx(tx, source, state, items, now)
|
|
return err
|
|
})
|
|
return new, del, err
|
|
}
|
|
|
|
// Implementation logic for [UpdateWithFetchedItems], which executes this inside a transaction.
|
|
func updateWithFetchedItemsTx(
|
|
db DB,
|
|
source string,
|
|
state []byte,
|
|
items []Item,
|
|
now time.Time,
|
|
) (int, int, error) {
|
|
// Get the existing items
|
|
existingItems, err := GetAllItemsForSource(db, source)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
existingIds := map[string]bool{}
|
|
existingItemsById := map[string]*Item{}
|
|
for _, item := range existingItems {
|
|
existingIds[item.Id] = true
|
|
existingItemsById[item.Id] = &item
|
|
}
|
|
|
|
// Split the fetch into adds and updates
|
|
var newItems []Item
|
|
var updatedItems []Item
|
|
for _, item := range items {
|
|
if existingIds[item.Id] {
|
|
updatedItems = append(updatedItems, item)
|
|
} else {
|
|
newItems = append(newItems, item)
|
|
}
|
|
}
|
|
|
|
// Bulk insert the new items
|
|
if err = AddItems(db, newItems); err != nil {
|
|
return 0, 0, err
|
|
}
|
|
|
|
// Bulk update the existing items
|
|
for _, item := range updatedItems {
|
|
BackfillItem(&item, existingItemsById[item.Id])
|
|
}
|
|
if err = UpdateItems(db, updatedItems); err != nil {
|
|
return 0, 0, err
|
|
}
|
|
|
|
envs, err := GetEnvs(db, source)
|
|
if err != nil {
|
|
return 0, 0, fmt.Errorf("failed to get envs for %s: %v", source, err)
|
|
}
|
|
|
|
postProcess, err := GetSourcePostProcessor(db, source)
|
|
if err != nil {
|
|
return 0, 0, fmt.Errorf("failed to get post-processor for %s: %v", source, err)
|
|
}
|
|
|
|
// If the source has an on-create trigger, run it for each new item
|
|
// On-create errors are ignored to avoid failing the fetch
|
|
onCreateArgv, err := GetArgvForAction(db, source, "on_create")
|
|
if err == nil && len(onCreateArgv) > 0 {
|
|
var updatedNewItems []Item
|
|
for _, item := range newItems {
|
|
var updatedItem Item
|
|
updatedItem, state, err = ExecuteItemAction(item, onCreateArgv, envs, state, time.Minute, postProcess)
|
|
if err != nil {
|
|
log.Printf("error: on_create failed for %s/%s: %v", item.Source, item.Id, err)
|
|
}
|
|
updatedNewItems = append(updatedNewItems, updatedItem)
|
|
}
|
|
UpdateItems(db, updatedNewItems)
|
|
}
|
|
|
|
// Get the list of expired items
|
|
fetchedIds := map[string]bool{}
|
|
for _, item := range items {
|
|
fetchedIds[item.Id] = true
|
|
}
|
|
expiredIds := map[string]bool{}
|
|
for id := range existingIds {
|
|
expiredIds[id] = !fetchedIds[id]
|
|
}
|
|
|
|
// Check expired items for deletion
|
|
idsToDelete := map[string]bool{}
|
|
for _, item := range existingItems {
|
|
if expiredIds[item.Id] && item.Deletable(now) {
|
|
idsToDelete[item.Id] = true
|
|
}
|
|
}
|
|
|
|
// Delete each item to be deleted
|
|
for id := range idsToDelete {
|
|
if _, err = DeleteItem(db, source, id); err != nil {
|
|
return 0, 0, err
|
|
}
|
|
}
|
|
|
|
if err = SetState(db, source, state); err != nil {
|
|
return 0, 0, err
|
|
}
|
|
|
|
return len(newItems), len(idsToDelete), nil
|
|
}
|