Unified fetch command

This commit is contained in:
Tim Van Baak 2025-03-07 09:48:44 -08:00
parent 5213045a33
commit fabd5110c3
3 changed files with 142 additions and 141 deletions

142
cmd/fetch.go Normal file
View File

@ -0,0 +1,142 @@
package cmd
import (
"fmt"
"log"
"slices"
"time"
"github.com/Jaculabilis/intake/core"
"github.com/spf13/cobra"
)
var fetchCmd = &cobra.Command{
Use: "fetch [--source name] [-n|--dry-run] [--env NAME-VALUE [-- env NAME=VALUE ...]] [-- argv...]",
Short: "Fetch items for a source and update the feed",
Long: fmt.Sprintf(`Fetch items for a source and update the feed.
Items returned by a successful fetch will be used to update the source.
A fetch is successful if all items output by the fetch are parsed successfully
and the exit code is 0. No changes will be made to the source if the fetch
does not succeed.
By default, fetch uses the configured "fetch" action on the specified source.
You can provide an alternative argv, execute as a dry run to make no changes,
or execute an argv outside of a source to test it. If a source is not specified,
a dry run is implied.
In a dry run, the items will be printed according to the chosen format and
the source will not be updated with the fetch result.
Source-level behavior configured by environment variables, such as INTAKE_TTL,
will not be applied by --env.
%s`, makeFormatHelpText()),
Example: ` Fetch items for a source:
intake fetch --source name
Fetch for a source using an alternative fetch action:
intake fetch --source name -- command args
Test a fetch action in isolation:
intake fetch -- command args
Test a fetch action with a source's state and environment:
intake fetch --source name --dry-run -- command args`,
Run: func(cmd *cobra.Command, args []string) {
fetch(
stringArg(cmd, "source"),
stringArrayArg(cmd, "env"),
stringArg(cmd, "format"),
stringArg(cmd, "timeout"),
boolArg(cmd, "dry-run"),
getArgv(cmd, args),
)
},
DisableFlagsInUseLine: true,
}
func init() {
rootCmd.AddCommand(fetchCmd)
fetchCmd.Flags().StringP("source", "s", "", "Source name to fetch (required)")
fetchCmd.Flags().StringArrayP("env", "e", nil, "Environment variables to set, in the form KEY=VAL")
fetchCmd.Flags().BoolP("dry-run", "n", false, "Do not update the source with the fetch results")
fetchCmd.Flags().StringP("format", "f", "headlines", "Feed format for returned items.")
fetchCmd.Flags().StringP("timeout", "t", core.DefaultTimeout.String(), "Timeout duration")
}
func fetch(
source string,
envs []string,
format string,
timeout string,
dryRun bool,
argv []string,
) {
formatter := formatAs(format)
duration, err := time.ParseDuration(timeout)
if err != nil {
log.Fatalf("error: invalid duration: %v", err)
}
if source == "" && len(argv) == 0 {
log.Fatal("Either --source or an argv must be provided")
}
// No source implies dry run because there's no source to update
dryRun = dryRun || source == ""
db := openAndMigrateDb()
var fSource string
var fArgv []string
var fEnvs []string
var fState []byte = nil
if source == "" {
fSource = "test"
fArgv = argv
fEnvs = envs
fState = nil
} else {
sourceState, sourceEnvs, sourceArgv, err := core.GetSourceActionInputs(db, source, "fetch")
if err != nil {
log.Fatalf("error: failed to load data for %s: %v", source, err)
}
fSource = source
if len(argv) > 0 {
fArgv = argv
} else {
fArgv = sourceArgv
}
fEnvs = append(sourceEnvs, envs...)
fState = sourceState
}
items, newState, errItem, err := core.Execute(fSource, fArgv, fEnvs, fState, "", duration)
if err != nil {
if !dryRun {
core.AddErrorItem(db, errItem)
}
log.Fatalf("error: failed to execute fetch: %v", err)
}
if dryRun {
log.Printf("fetch returned %d items", len(items))
if !slices.Equal(fState, newState) {
log.Printf("state update (%d => %d bytes)", len(fState), len(newState))
}
for _, item := range items {
fmt.Println(formatter(item))
}
return
}
added, deleted, err := core.UpdateWithFetchedItems(db, source, newState, items, time.Now())
if err != nil {
log.Fatalf("error: failed to update: %v", err)
}
log.Printf("%s added %d items, updated %d items, and deleted %d items", source, added, len(items)-added, deleted)
}

View File

@ -1,85 +0,0 @@
package cmd
import (
"fmt"
"log"
"slices"
"time"
"github.com/Jaculabilis/intake/core"
"github.com/spf13/cobra"
)
var sourceFetchCmd = &cobra.Command{
Use: "fetch",
Short: "Fetch items for a source and update the feed",
Long: fmt.Sprintf(`Fetch items from a feed source using the configured "fetch" action.
Items returned by a successful fetch will be used to update the source.
A fetch is successful if all items output by the fetch are parsed successfully
and the exit code is 0. No changes will be made to the source if the fetch
does not succeed.
In a dry run, the items will be printed according to the chosen format and
the source will not be updated with the fetch result.
%s`, makeFormatHelpText()),
Run: func(cmd *cobra.Command, args []string) {
sourceFetch(
stringArg(cmd, "source"),
stringArg(cmd, "format"),
stringArg(cmd, "timeout"),
boolArg(cmd, "dry-run"),
)
},
}
func init() {
sourceCmd.AddCommand(sourceFetchCmd)
sourceFetchCmd.Flags().StringP("source", "s", "", "Source name to fetch (required)")
sourceFetchCmd.MarkFlagRequired("source")
sourceFetchCmd.Flags().StringP("format", "f", "headlines", "Feed format for returned items.")
sourceFetchCmd.Flags().Bool("dry-run", false, "Instead of updating the source, print the fetched items")
sourceFetchCmd.Flags().StringP("timeout", "t", core.DefaultTimeout.String(),
fmt.Sprintf("Timeout duration (default: %s)", core.DefaultTimeout.String()))
}
func sourceFetch(source string, format string, timeout string, dryRun bool) {
formatter := formatAs(format)
duration, err := time.ParseDuration(timeout)
if err != nil {
log.Fatalf("error: invalid duration: %v", err)
}
db := openAndMigrateDb()
state, envs, argv, err := core.GetSourceActionInputs(db, source, "fetch")
if err != nil {
log.Fatalf("error: failed to load data for %s: %v", source, err)
}
items, newState, errItem, err := core.Execute(source, argv, envs, state, "", duration)
if err != nil {
core.AddErrorItem(db, errItem)
log.Fatalf("error: failed to execute fetch: %v", err)
}
if dryRun {
log.Printf("Fetch returned %d items", len(items))
if !slices.Equal(state, newState) {
log.Printf("State update (%d => %d bytes)", len(state), len(newState))
}
for _, item := range items {
fmt.Println(formatter(item))
}
return
}
added, deleted, err := core.UpdateWithFetchedItems(db, source, newState, items, time.Now())
if err != nil {
log.Fatalf("error: failed to update: %v", err)
}
log.Printf("%s added %d items, updated %d items, and deleted %d items", source, added, len(items)-added, deleted)
}

View File

@ -1,56 +0,0 @@
package cmd
import (
"fmt"
"log"
"time"
"github.com/Jaculabilis/intake/core"
"github.com/spf13/cobra"
)
var sourceTestCmd = &cobra.Command{
Use: "test [flags] -- argv",
Short: "Test a fetch action",
Long: fmt.Sprintf(`Execute a command as if it were a feed source's fetch action.
Source-level configuration that is normally set via environment variable,
such as INTAKE_TTL, will not be applied by --env.
%s`, makeFormatHelpText()),
Run: func(cmd *cobra.Command, args []string) {
sourceTest(
stringArrayArg(cmd, "env"),
stringArg(cmd, "format"),
stringArg(cmd, "timeout"),
getArgv(cmd, args),
)
},
}
func init() {
sourceCmd.AddCommand(sourceTestCmd)
sourceTestCmd.Flags().StringArrayP("env", "e", nil, "Environment variables to set, in the form KEY=VAL")
sourceTestCmd.Flags().StringP("format", "f", "headlines", "Feed format for returned items.")
sourceTestCmd.Flags().StringP("timeout", "t", core.DefaultTimeout.String(),
fmt.Sprintf("Timeout duration (default: %s)", core.DefaultTimeout.String()))
}
func sourceTest(env []string, format string, timeout string, cmd []string) {
formatter := formatAs(format)
duration, err := time.ParseDuration(timeout)
if err != nil {
log.Fatalf("error: invalid duration: %v", err)
}
items, state, _, err := core.Execute("test", cmd, env, nil, "", duration)
log.Printf("returned %d items, %d bytes of state", len(items), len(state))
if err != nil {
log.Fatal(err)
}
for _, item := range items {
fmt.Println(formatter(item))
}
}