diff --git a/cmd/sourceFetch.go b/cmd/sourceFetch.go index eb8de2b..f387b9d 100644 --- a/cmd/sourceFetch.go +++ b/cmd/sourceFetch.go @@ -1,11 +1,11 @@ package cmd import ( - "bufio" "fmt" - "os/exec" - "strings" + "log" + "time" + "github.com/Jaculabilis/intake/core" "github.com/spf13/cobra" ) @@ -16,56 +16,57 @@ var sourceFetchCmd = &cobra.Command{ Items returned by a successful fetch will be used to update the source. A fetch is successful if all items output by the fetch are parsed successfully and the exit code is 0. No changes will be made to the source if the fetch -does not succeed.`, +does not succeed. + +In a dry run, the items will be printed according to the chosen format.`, Run: func(cmd *cobra.Command, args []string) { sourceFetch() }, } -var source string -var dryRun bool +var sourceFetchSource string +var sourceFetchFormat string +var sourceFetchDryRun bool func init() { sourceCmd.AddCommand(sourceFetchCmd) - sourceFetchCmd.Flags().StringVarP(&source, "source", "s", "", "Source name to fetch (required)") + sourceFetchCmd.Flags().StringVarP(&sourceFetchSource, "source", "s", "", "Source name to fetch (required)") sourceFetchCmd.MarkFlagRequired("source") - sourceFetchCmd.Flags().BoolVar(&dryRun, "dry-run", false, "Instead of updating the source, print the fetched items") + sourceFetchCmd.Flags().StringVarP(&sourceFetchFormat, "format", "f", "headlines", "Feed format for returned items.") + sourceFetchCmd.Flags().BoolVar(&sourceFetchDryRun, "dry-run", false, "Instead of updating the source, print the fetched items") } func sourceFetch() { - fmt.Printf("Hello %s\n", source) + formatter, err := core.FormatAs(sourceFetchFormat) + if err != nil { + log.Fatalf("error: %v", err) + } - if dryRun { + db := openAndMigrateDb() + + argv, err := core.GetArgvForAction(db, sourceFetchSource, "fetch") + if err != nil { + log.Fatalf("error: failed to get fetch action: %v", err) + } + + items, err := core.Execute(sourceFetchSource, argv, nil, "", time.Minute) + if err != nil { + log.Fatalf("error: %v", err) + } + + if sourceFetchDryRun { + log.Printf("Fetch returned %d items", len(items)) + for _, item := range items { + fmt.Println(formatter(item)) + } return } - cmd := exec.Command("sh", "-c", "echo Wout; sleep 1; echo 1>&2 Werr; sleep 1; echo Wout2; sleep 1; echo 1>&2 Werr2") - stdout, _ := cmd.StdoutPipe() - stderr, _ := cmd.StderrPipe() - - cout := make(chan int) - go func() { - scanout := bufio.NewScanner(stdout) - for scanout.Scan() { - text := strings.TrimSpace(scanout.Text()) - fmt.Printf("[stdout] %s\n", text) - } - cout <- 1 - }() - - cerr := make(chan int) - go func() { - scanerr := bufio.NewScanner(stderr) - for scanerr.Scan() { - text := strings.TrimSpace(scanerr.Text()) - fmt.Printf("[stderr] %s\n", text) - } - cerr <- 1 - }() - - cmd.Start() - <-cout - <-cerr + added, deleted, err := core.UpdateWithFetchedItems(db, sourceFetchSource, items) + if err != nil { + log.Fatalf("error: %v", err) + } + log.Printf("%s added %d items, updated %d items, and deleted %d items", sourceFetchSource, added, len(items)-added, deleted) } diff --git a/cmd/sourceTest.go b/cmd/sourceTest.go index e5edd7b..00bf823 100644 --- a/cmd/sourceTest.go +++ b/cmd/sourceTest.go @@ -41,7 +41,7 @@ func sourceTest(cmd []string) { log.Fatal(err) } - items, err := core.Execute(cmd, testEnv, "", time.Minute) + items, err := core.Execute("", cmd, testEnv, "", time.Minute) log.Printf("Returned %d items", len(items)) if err != nil { log.Fatal(err) diff --git a/core/action.go b/core/action.go index f204724..4bce5b9 100644 --- a/core/action.go +++ b/core/action.go @@ -85,7 +85,7 @@ func DeleteAction(db *DB, source string, name string) error { return err } -func readStdout(stdout io.ReadCloser, items chan Item, cparse chan bool) { +func readStdout(stdout io.ReadCloser, source string, items chan Item, cparse chan bool) { var item Item parseError := false scanout := bufio.NewScanner(stdout) @@ -93,10 +93,13 @@ func readStdout(stdout io.ReadCloser, items chan Item, cparse chan bool) { data := scanout.Bytes() err := json.Unmarshal(data, &item) if err != nil { - log.Printf("[stdout] %s\n", strings.TrimSpace(string(data))) + log.Printf("[%s: stdout] %s\n", source, strings.TrimSpace(string(data))) parseError = true } else { - log.Printf("[item] %s\n", item.Id) + item.Active = true // These fields aren't up to + item.Created = 0 // the action to set and + item.Source = source // shouldn't be overrideable + log.Printf("[%s: item] %s\n", source, item.Id) items <- item } } @@ -105,11 +108,11 @@ func readStdout(stdout io.ReadCloser, items chan Item, cparse chan bool) { close(items) } -func readStderr(stderr io.ReadCloser, done chan bool) { +func readStderr(stderr io.ReadCloser, source string, done chan bool) { scanerr := bufio.NewScanner(stderr) for scanerr.Scan() { text := strings.TrimSpace(scanerr.Text()) - log.Printf("[stderr] %s\n", text) + log.Printf("[%s: stderr] %s\n", source, text) } done <- true } @@ -120,6 +123,7 @@ func writeStdin(stdin io.WriteCloser, text string) { } func Execute( + source string, argv []string, env []string, input string, @@ -167,8 +171,8 @@ func Execute( // Routines handling the process i/o go writeStdin(stdin, input) - go readStdout(stdout, cout, cparse) - go readStderr(stderr, cerr) + go readStdout(stdout, source, cout, cparse) + go readStderr(stderr, source, cerr) // Kick off the command err = cmd.Start() diff --git a/core/action_test.go b/core/action_test.go index fb8672c..a4fe599 100644 --- a/core/action_test.go +++ b/core/action_test.go @@ -73,6 +73,7 @@ func TestExecute(t *testing.T) { } res, err := Execute( + "", []string{"true"}, nil, "", @@ -83,6 +84,7 @@ func TestExecute(t *testing.T) { // Exit with error code res, err = Execute( + "", []string{"false"}, nil, "", @@ -92,6 +94,7 @@ func TestExecute(t *testing.T) { assertLen(res, 0) res, err = Execute( + "", []string{"sh", "-c", "exit 22"}, nil, "", @@ -102,6 +105,7 @@ func TestExecute(t *testing.T) { // Timeout res, err = Execute( + "", []string{"sleep", "10"}, nil, "", @@ -112,6 +116,7 @@ func TestExecute(t *testing.T) { // Returning items res, err = Execute( + "", []string{"jq", "-cn", `{id: "foo"}`}, nil, "", @@ -125,6 +130,7 @@ func TestExecute(t *testing.T) { // Read from stdin res, err = Execute( + "", []string{"jq", "-cR", `{id: .}`}, nil, "bar", @@ -138,6 +144,7 @@ func TestExecute(t *testing.T) { // Set env res, err = Execute( + "", []string{"jq", "-cn", `{id: env.HELLO}`}, []string{"HELLO=baz"}, "", @@ -151,6 +158,7 @@ func TestExecute(t *testing.T) { // With logging on stderr res, err = Execute( + "", []string{"sh", "-c", `echo 1>&2 Hello; jq -cn '{id: "box"}'; echo 1>&2 World`}, nil, "", diff --git a/core/item.go b/core/item.go index cf121e0..3b97755 100644 --- a/core/item.go +++ b/core/item.go @@ -18,6 +18,11 @@ type Item struct { Time int `json:"time"` } +// Whether an item that no longer appears in a fetch can be deleted. +func (item Item) Deletable() bool { + return !item.Active +} + func FormatAsHeadline(item Item) string { title := item.Title if title == "" { diff --git a/core/source.go b/core/source.go index c77bf12..d29340b 100644 --- a/core/source.go +++ b/core/source.go @@ -63,6 +63,25 @@ func AddItem( return err } +func AddItems(db *DB, items []Item) error { + return db.Transact(func(tx *sql.Tx) error { + stmt, err := tx.Prepare(` + insert into items (source, id, active, title, author, body, link, time) + values (?, ?, ?, ?, ?, ?, ?, ?) + `) + if err != nil { + return err + } + for _, item := range items { + _, err = stmt.Exec(item.Source, item.Id, true, item.Title, item.Author, item.Body, item.Link, item.Time) + if err != nil { + return err + } + } + return nil + }) +} + // Deactivate an item, returning its previous active state. func DeactivateItem(db *DB, source string, id string) (bool, error) { row := db.QueryRow(` @@ -153,3 +172,62 @@ func GetAllItemsForSource(db *DB, source string) ([]Item, error) { source = ? `, source) } + +// Given the results of a fetch, add new items, update existing items, and delete expired items. +func UpdateWithFetchedItems(db *DB, source string, items []Item) (int, int, error) { + // Get the existing items + existingItems, err := GetAllItemsForSource(db, source) + if err != nil { + return 0, 0, err + } + existingIds := map[string]bool{} + for _, item := range existingItems { + existingIds[item.Id] = true + } + + // Split the fetch into adds and updates + var newItems []Item + var updatedItems []Item + for _, item := range items { + if existingIds[item.Id] { + updatedItems = append(updatedItems, item) + } else { + newItems = append(newItems, item) + } + } + + // Bulk insert the new items + if err = AddItems(db, newItems); err != nil { + return 0, 0, err + } + + // Bulk update the existing items + // TODO + + // Get the list of expired items + fetchedIds := map[string]bool{} + for _, item := range items { + fetchedIds[item.Id] = true + } + expiredIds := map[string]bool{} + for id := range existingIds { + expiredIds[id] = !fetchedIds[id] + } + + // Check expired items for deletion + idsToDelete := map[string]bool{} + for _, item := range existingItems { + if expiredIds[item.Id] && item.Deletable() { + idsToDelete[item.Id] = true + } + } + + // Delete each item to be deleted + for id := range idsToDelete { + if _, err = DeleteItem(db, source, id); err != nil { + return 0, 0, err + } + } + + return len(newItems), len(idsToDelete), nil +} diff --git a/core/source_test.go b/core/source_test.go index d866677..07bc8d0 100644 --- a/core/source_test.go +++ b/core/source_test.go @@ -118,3 +118,45 @@ func TestAddItem(t *testing.T) { t.Fatal("should get one item") } } + +func TestUpdateSourceAddAndDelete(t *testing.T) { + db := EphemeralDb(t) + if err := AddSource(db, "test"); err != nil { + t.Fatal(err) + } + + a := Item{Source: "test", Id: "a"} + add, del, err := UpdateWithFetchedItems(db, "test", []Item{a}) + if add != 1 || del != 0 || err != nil { + t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) + } + + add, del, err = UpdateWithFetchedItems(db, "test", []Item{a}) + if add != 0 || del != 0 || err != nil { + t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) + } + + b := Item{Source: "test", Id: "b"} + add, del, err = UpdateWithFetchedItems(db, "test", []Item{a, b}) + if add != 1 || del != 0 || err != nil { + t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) + } + + if _, err = DeactivateItem(db, "test", "a"); err != nil { + t.Fatal(err) + } + add, del, err = UpdateWithFetchedItems(db, "test", []Item{a, b}) + if add != 0 || del != 0 || err != nil { + t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) + } + + add, del, err = UpdateWithFetchedItems(db, "test", []Item{b}) + if add != 0 || del != 1 || err != nil { + t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) + } + + add, del, err = UpdateWithFetchedItems(db, "test", []Item{b}) + if add != 0 || del != 0 || err != nil { + t.Fatalf("update failed: add %d, del %d, err %v", add, del, err) + } +}