intake/core/db.go

199 lines
3.8 KiB
Go

package core
import (
"database/sql"
"embed"
"errors"
"fmt"
"log"
_ "github.com/mattn/go-sqlite3"
)
//go:embed sql/*.sql
var migrations embed.FS
// Idempotently initialize the database. Safe to call unconditionally.
func InitDatabase(db *sql.DB) error {
rows, err := db.Query(`
select exists (
select 1
from sqlite_master
where type = 'table'
and name = 'migrations'
)
`)
if err != nil {
return err
}
var exists bool
for rows.Next() {
rows.Scan(&exists)
}
if exists {
return nil
}
err = ApplyMigration(db, "0000_baseline.sql")
return err
}
// Get a map of migration names to whether the migration has been applied.
func GetPendingMigrations(db *sql.DB) (map[string]bool, error) {
allMigrations, err := migrations.ReadDir("sql")
if err != nil {
return nil, err
}
complete := map[string]bool{}
for _, mig := range allMigrations {
complete[mig.Name()] = false
}
rows, err := db.Query("select name from migrations")
if err != nil {
return nil, err
}
for rows.Next() {
var name string
rows.Scan(&name)
complete[name] = true
}
return complete, nil
}
// Apply a migration by name.
func ApplyMigration(db *sql.DB, name string) error {
data, err := migrations.ReadFile("sql/" + name)
if err != nil {
log.Fatalf("Missing migration %s", name)
}
log.Printf("Applying migration %s", name)
_, err = db.Exec(string(data))
if err != nil {
return err
}
_, err = db.Exec("insert into migrations (name) values (?)", name)
return err
}
// Apply all pending migrations.
func MigrateDatabase(db *sql.DB) error {
pending, err := GetPendingMigrations(db)
if err != nil {
return err
}
for name, complete := range pending {
if !complete {
err = ApplyMigration(db, name)
if err != nil {
return err
}
}
}
return nil
}
func AddSource(db *sql.DB, name string) error {
_, err := db.Exec(`
insert into sources (name)
values (?)
`, name)
return err
}
func DeleteSource(db *sql.DB, name string) error {
_, err := db.Exec(`
delete from sources
where name = ?
`, name)
return err
}
func AddItem(
db *sql.DB,
source string,
id string,
title string,
author string,
body string,
link string,
time int,
) error {
_, err := db.Exec(`
insert into items (source, id, active, title, author, body, link, time)
values (?, ?, ?, ?, ?, ?, ?, ?)
`, source, id, true, title, author, body, link, time)
return err
}
// Deactivate an item, returning its previous active state.
func DeactivateItem(db *sql.DB, source string, id string) (bool, error) {
row := db.QueryRow(`
select active
from items
where source = ? and id = ?
`, source, id)
var active bool
err := row.Scan(&active)
if err != nil && errors.Is(err, sql.ErrNoRows) {
return false, fmt.Errorf("item %s/%s not found", source, id)
}
_, err = db.Exec(`
update items
set active = 0
where source = ? and id = ?
`, source, id)
if err != nil {
return false, err
}
return active, nil
}
func GetAllActiveItems(db *sql.DB) ([]Item, error) {
rows, err := db.Query(`
select
source, id, created, active, title, author, body, link, time
from items
where active <> 0
`)
if err != nil {
return nil, err
}
var items []Item
for rows.Next() {
var item Item
rows.Scan(&item.Source, &item.Id, &item.Created, &item.Active, &item.Title, &item.Author, &item.Body, &item.Link, &item.Time)
items = append(items, item)
}
return items, nil
}
func GetActiveItemsForSource(db *sql.DB, source string) ([]Item, error) {
rows, err := db.Query(`
select
source, id, created, active, title, author, body, link, time
from items
where
source = ?
and active <> 0
`, source)
if err != nil {
return nil, err
}
var items []Item
for rows.Next() {
var item Item
rows.Scan(&item.Source, &item.Id, &item.Created, &item.Active, &item.Title, &item.Author, &item.Body, &item.Link, &item.Time)
items = append(items, item)
}
return items, nil
}