Implement source fetch

This commit is contained in:
Tim Van Baak 2025-01-23 12:26:21 -08:00
parent dde799ff8e
commit b7683f6805
7 changed files with 183 additions and 45 deletions

View File

@ -1,11 +1,11 @@
package cmd package cmd
import ( import (
"bufio"
"fmt" "fmt"
"os/exec" "log"
"strings" "time"
"github.com/Jaculabilis/intake/core"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -16,56 +16,57 @@ var sourceFetchCmd = &cobra.Command{
Items returned by a successful fetch will be used to update the source. Items returned by a successful fetch will be used to update the source.
A fetch is successful if all items output by the fetch are parsed successfully A fetch is successful if all items output by the fetch are parsed successfully
and the exit code is 0. No changes will be made to the source if the fetch and the exit code is 0. No changes will be made to the source if the fetch
does not succeed.`, does not succeed.
In a dry run, the items will be printed according to the chosen format.`,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
sourceFetch() sourceFetch()
}, },
} }
var source string var sourceFetchSource string
var dryRun bool var sourceFetchFormat string
var sourceFetchDryRun bool
func init() { func init() {
sourceCmd.AddCommand(sourceFetchCmd) sourceCmd.AddCommand(sourceFetchCmd)
sourceFetchCmd.Flags().StringVarP(&source, "source", "s", "", "Source name to fetch (required)") sourceFetchCmd.Flags().StringVarP(&sourceFetchSource, "source", "s", "", "Source name to fetch (required)")
sourceFetchCmd.MarkFlagRequired("source") sourceFetchCmd.MarkFlagRequired("source")
sourceFetchCmd.Flags().BoolVar(&dryRun, "dry-run", false, "Instead of updating the source, print the fetched items") sourceFetchCmd.Flags().StringVarP(&sourceFetchFormat, "format", "f", "headlines", "Feed format for returned items.")
sourceFetchCmd.Flags().BoolVar(&sourceFetchDryRun, "dry-run", false, "Instead of updating the source, print the fetched items")
} }
func sourceFetch() { func sourceFetch() {
fmt.Printf("Hello %s\n", source) formatter, err := core.FormatAs(sourceFetchFormat)
if err != nil {
log.Fatalf("error: %v", err)
}
if dryRun { db := openAndMigrateDb()
argv, err := core.GetArgvForAction(db, sourceFetchSource, "fetch")
if err != nil {
log.Fatalf("error: failed to get fetch action: %v", err)
}
items, err := core.Execute(sourceFetchSource, argv, nil, "", time.Minute)
if err != nil {
log.Fatalf("error: %v", err)
}
if sourceFetchDryRun {
log.Printf("Fetch returned %d items", len(items))
for _, item := range items {
fmt.Println(formatter(item))
}
return return
} }
cmd := exec.Command("sh", "-c", "echo Wout; sleep 1; echo 1>&2 Werr; sleep 1; echo Wout2; sleep 1; echo 1>&2 Werr2") added, deleted, err := core.UpdateWithFetchedItems(db, sourceFetchSource, items)
stdout, _ := cmd.StdoutPipe() if err != nil {
stderr, _ := cmd.StderrPipe() log.Fatalf("error: %v", err)
cout := make(chan int)
go func() {
scanout := bufio.NewScanner(stdout)
for scanout.Scan() {
text := strings.TrimSpace(scanout.Text())
fmt.Printf("[stdout] %s\n", text)
} }
cout <- 1 log.Printf("%s added %d items, updated %d items, and deleted %d items", sourceFetchSource, added, len(items)-added, deleted)
}()
cerr := make(chan int)
go func() {
scanerr := bufio.NewScanner(stderr)
for scanerr.Scan() {
text := strings.TrimSpace(scanerr.Text())
fmt.Printf("[stderr] %s\n", text)
}
cerr <- 1
}()
cmd.Start()
<-cout
<-cerr
} }

View File

@ -41,7 +41,7 @@ func sourceTest(cmd []string) {
log.Fatal(err) log.Fatal(err)
} }
items, err := core.Execute(cmd, testEnv, "", time.Minute) items, err := core.Execute("", cmd, testEnv, "", time.Minute)
log.Printf("Returned %d items", len(items)) log.Printf("Returned %d items", len(items))
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)

View File

@ -85,7 +85,7 @@ func DeleteAction(db *DB, source string, name string) error {
return err return err
} }
func readStdout(stdout io.ReadCloser, items chan Item, cparse chan bool) { func readStdout(stdout io.ReadCloser, source string, items chan Item, cparse chan bool) {
var item Item var item Item
parseError := false parseError := false
scanout := bufio.NewScanner(stdout) scanout := bufio.NewScanner(stdout)
@ -93,10 +93,13 @@ func readStdout(stdout io.ReadCloser, items chan Item, cparse chan bool) {
data := scanout.Bytes() data := scanout.Bytes()
err := json.Unmarshal(data, &item) err := json.Unmarshal(data, &item)
if err != nil { if err != nil {
log.Printf("[stdout] %s\n", strings.TrimSpace(string(data))) log.Printf("[%s: stdout] %s\n", source, strings.TrimSpace(string(data)))
parseError = true parseError = true
} else { } else {
log.Printf("[item] %s\n", item.Id) item.Active = true // These fields aren't up to
item.Created = 0 // the action to set and
item.Source = source // shouldn't be overrideable
log.Printf("[%s: item] %s\n", source, item.Id)
items <- item items <- item
} }
} }
@ -105,11 +108,11 @@ func readStdout(stdout io.ReadCloser, items chan Item, cparse chan bool) {
close(items) close(items)
} }
func readStderr(stderr io.ReadCloser, done chan bool) { func readStderr(stderr io.ReadCloser, source string, done chan bool) {
scanerr := bufio.NewScanner(stderr) scanerr := bufio.NewScanner(stderr)
for scanerr.Scan() { for scanerr.Scan() {
text := strings.TrimSpace(scanerr.Text()) text := strings.TrimSpace(scanerr.Text())
log.Printf("[stderr] %s\n", text) log.Printf("[%s: stderr] %s\n", source, text)
} }
done <- true done <- true
} }
@ -120,6 +123,7 @@ func writeStdin(stdin io.WriteCloser, text string) {
} }
func Execute( func Execute(
source string,
argv []string, argv []string,
env []string, env []string,
input string, input string,
@ -167,8 +171,8 @@ func Execute(
// Routines handling the process i/o // Routines handling the process i/o
go writeStdin(stdin, input) go writeStdin(stdin, input)
go readStdout(stdout, cout, cparse) go readStdout(stdout, source, cout, cparse)
go readStderr(stderr, cerr) go readStderr(stderr, source, cerr)
// Kick off the command // Kick off the command
err = cmd.Start() err = cmd.Start()

View File

@ -73,6 +73,7 @@ func TestExecute(t *testing.T) {
} }
res, err := Execute( res, err := Execute(
"",
[]string{"true"}, []string{"true"},
nil, nil,
"", "",
@ -83,6 +84,7 @@ func TestExecute(t *testing.T) {
// Exit with error code // Exit with error code
res, err = Execute( res, err = Execute(
"",
[]string{"false"}, []string{"false"},
nil, nil,
"", "",
@ -92,6 +94,7 @@ func TestExecute(t *testing.T) {
assertLen(res, 0) assertLen(res, 0)
res, err = Execute( res, err = Execute(
"",
[]string{"sh", "-c", "exit 22"}, []string{"sh", "-c", "exit 22"},
nil, nil,
"", "",
@ -102,6 +105,7 @@ func TestExecute(t *testing.T) {
// Timeout // Timeout
res, err = Execute( res, err = Execute(
"",
[]string{"sleep", "10"}, []string{"sleep", "10"},
nil, nil,
"", "",
@ -112,6 +116,7 @@ func TestExecute(t *testing.T) {
// Returning items // Returning items
res, err = Execute( res, err = Execute(
"",
[]string{"jq", "-cn", `{id: "foo"}`}, []string{"jq", "-cn", `{id: "foo"}`},
nil, nil,
"", "",
@ -125,6 +130,7 @@ func TestExecute(t *testing.T) {
// Read from stdin // Read from stdin
res, err = Execute( res, err = Execute(
"",
[]string{"jq", "-cR", `{id: .}`}, []string{"jq", "-cR", `{id: .}`},
nil, nil,
"bar", "bar",
@ -138,6 +144,7 @@ func TestExecute(t *testing.T) {
// Set env // Set env
res, err = Execute( res, err = Execute(
"",
[]string{"jq", "-cn", `{id: env.HELLO}`}, []string{"jq", "-cn", `{id: env.HELLO}`},
[]string{"HELLO=baz"}, []string{"HELLO=baz"},
"", "",
@ -151,6 +158,7 @@ func TestExecute(t *testing.T) {
// With logging on stderr // With logging on stderr
res, err = Execute( res, err = Execute(
"",
[]string{"sh", "-c", `echo 1>&2 Hello; jq -cn '{id: "box"}'; echo 1>&2 World`}, []string{"sh", "-c", `echo 1>&2 Hello; jq -cn '{id: "box"}'; echo 1>&2 World`},
nil, nil,
"", "",

View File

@ -18,6 +18,11 @@ type Item struct {
Time int `json:"time"` Time int `json:"time"`
} }
// Whether an item that no longer appears in a fetch can be deleted.
func (item Item) Deletable() bool {
return !item.Active
}
func FormatAsHeadline(item Item) string { func FormatAsHeadline(item Item) string {
title := item.Title title := item.Title
if title == "" { if title == "" {

View File

@ -63,6 +63,25 @@ func AddItem(
return err return err
} }
func AddItems(db *DB, items []Item) error {
return db.Transact(func(tx *sql.Tx) error {
stmt, err := tx.Prepare(`
insert into items (source, id, active, title, author, body, link, time)
values (?, ?, ?, ?, ?, ?, ?, ?)
`)
if err != nil {
return err
}
for _, item := range items {
_, err = stmt.Exec(item.Source, item.Id, true, item.Title, item.Author, item.Body, item.Link, item.Time)
if err != nil {
return err
}
}
return nil
})
}
// Deactivate an item, returning its previous active state. // Deactivate an item, returning its previous active state.
func DeactivateItem(db *DB, source string, id string) (bool, error) { func DeactivateItem(db *DB, source string, id string) (bool, error) {
row := db.QueryRow(` row := db.QueryRow(`
@ -153,3 +172,62 @@ func GetAllItemsForSource(db *DB, source string) ([]Item, error) {
source = ? source = ?
`, source) `, source)
} }
// Given the results of a fetch, add new items, update existing items, and delete expired items.
func UpdateWithFetchedItems(db *DB, source string, items []Item) (int, int, error) {
// Get the existing items
existingItems, err := GetAllItemsForSource(db, source)
if err != nil {
return 0, 0, err
}
existingIds := map[string]bool{}
for _, item := range existingItems {
existingIds[item.Id] = true
}
// Split the fetch into adds and updates
var newItems []Item
var updatedItems []Item
for _, item := range items {
if existingIds[item.Id] {
updatedItems = append(updatedItems, item)
} else {
newItems = append(newItems, item)
}
}
// Bulk insert the new items
if err = AddItems(db, newItems); err != nil {
return 0, 0, err
}
// Bulk update the existing items
// TODO
// Get the list of expired items
fetchedIds := map[string]bool{}
for _, item := range items {
fetchedIds[item.Id] = true
}
expiredIds := map[string]bool{}
for id := range existingIds {
expiredIds[id] = !fetchedIds[id]
}
// Check expired items for deletion
idsToDelete := map[string]bool{}
for _, item := range existingItems {
if expiredIds[item.Id] && item.Deletable() {
idsToDelete[item.Id] = true
}
}
// Delete each item to be deleted
for id := range idsToDelete {
if _, err = DeleteItem(db, source, id); err != nil {
return 0, 0, err
}
}
return len(newItems), len(idsToDelete), nil
}

View File

@ -118,3 +118,45 @@ func TestAddItem(t *testing.T) {
t.Fatal("should get one item") t.Fatal("should get one item")
} }
} }
func TestUpdateSourceAddAndDelete(t *testing.T) {
db := EphemeralDb(t)
if err := AddSource(db, "test"); err != nil {
t.Fatal(err)
}
a := Item{Source: "test", Id: "a"}
add, del, err := UpdateWithFetchedItems(db, "test", []Item{a})
if add != 1 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
}
add, del, err = UpdateWithFetchedItems(db, "test", []Item{a})
if add != 0 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
}
b := Item{Source: "test", Id: "b"}
add, del, err = UpdateWithFetchedItems(db, "test", []Item{a, b})
if add != 1 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
}
if _, err = DeactivateItem(db, "test", "a"); err != nil {
t.Fatal(err)
}
add, del, err = UpdateWithFetchedItems(db, "test", []Item{a, b})
if add != 0 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
}
add, del, err = UpdateWithFetchedItems(db, "test", []Item{b})
if add != 0 || del != 1 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
}
add, del, err = UpdateWithFetchedItems(db, "test", []Item{b})
if add != 0 || del != 0 || err != nil {
t.Fatalf("update failed: add %d, del %d, err %v", add, del, err)
}
}