Source-level timeouts

This commit is contained in:
Tim Van Baak 2025-03-31 07:17:46 -07:00
parent 283e6a15f0
commit 1a3309ae54
7 changed files with 101 additions and 16 deletions

View File

@ -137,7 +137,7 @@ Instead, the web interface can be locked behind a password set via `intake passw
* [ ] TUI feed view
* [ ] Escape HTML in logs saved to error items
* [ ] Mark which sources have INTAKE_FETCH and which don't
* [ ] Source-level execution timeout
* [x] Source-level execution timeout
* [ ] Support fetch specs as batch specs so you can e.g. batch to a dow
* [ ] Allow desktop-only sources that are invisible on mobile
* [ ] Items supporting a nonexistent action should gray out the action button

View File

@ -64,7 +64,7 @@ func init() {
actionCmd.PersistentFlags().StringP("item", "i", "", "Item to run action on")
actionCmd.Flags().StringP("format", "f", "headlines", "Feed format for returned items")
actionCmd.Flags().StringP("timeout", "t", core.DefaultTimeout.String(), "Timeout duration")
actionCmd.Flags().StringP("timeout", "t", "", "Timeout duration")
actionCmd.Flags().BoolP("dry-run", "n", false, "Instead of updating the item, print it")
actionCmd.Flags().Bool("diff", false, "Show which fields of the item changed")
actionCmd.Flags().Bool("force", false, "Execute the action even if the item does not support it")
@ -88,10 +88,6 @@ func action(
log.Fatal("error: --action is empty")
}
formatter := formatAs(format)
duration, err := time.ParseDuration(timeout)
if err != nil {
log.Fatalf("error: invalid duration: %v", err)
}
db := openAndMigrateDb()
@ -112,6 +108,21 @@ func action(
return
}
if timeout == "" {
var err error
timeout, err = core.GetSourceTimeout(db, source)
if err != nil {
log.Fatalf("error: %v", err)
}
}
if timeout == "" {
timeout = core.DefaultTimeout.String()
}
duration, err := time.ParseDuration(timeout)
if err != nil {
log.Fatalf("error: invalid duration: %v", err)
}
state, envs, argv, err := core.GetSourceActionInputs(db, source, action)
if err != nil {
log.Fatalf("error: failed to load data for %s: %v", source, err)

View File

@ -64,7 +64,7 @@ func init() {
fetchCmd.Flags().StringArrayP("env", "e", nil, "Environment variables to set, in the form KEY=VAL")
fetchCmd.Flags().BoolP("dry-run", "n", false, "Do not update the source with the fetch results")
fetchCmd.Flags().StringP("format", "f", "headlines", "Feed format for returned items.")
fetchCmd.Flags().StringP("timeout", "t", core.DefaultTimeout.String(), "Timeout duration")
fetchCmd.Flags().StringP("timeout", "t", "", "Timeout duration")
}
func fetch(
@ -77,11 +77,6 @@ func fetch(
) {
formatter := formatAs(format)
duration, err := time.ParseDuration(timeout)
if err != nil {
log.Fatalf("error: invalid duration: %v", err)
}
if source == "" && len(argv) == 0 {
log.Fatal("Either --source or an argv must be provided")
}
@ -91,6 +86,21 @@ func fetch(
db := openAndMigrateDb()
if timeout == "" && source != "" {
var err error
timeout, err = core.GetSourceTimeout(db, source)
if err != nil {
log.Fatalf("error: %v", err)
}
}
if timeout == "" {
timeout = core.DefaultTimeout.String()
}
duration, err := time.ParseDuration(timeout)
if err != nil {
log.Fatalf("error: invalid duration: %v", err)
}
var fSource string
var fArgv []string
var fEnvs []string

View File

@ -203,13 +203,25 @@ func fetchReadySources(db DB) {
}
log.Printf("%s: fetching", schedule.Source)
duration := DefaultTimeout
timeout, err := GetSourceTimeout(db, schedule.Source)
if err != nil {
log.Printf("error: failed to get source timeout, using default: %v", err)
} else if timeout != "" {
duration, err = time.ParseDuration(timeout)
if err != nil {
log.Printf("error: invalid duration: %v", err)
duration = DefaultTimeout
}
}
state, envs, argv, err := GetSourceActionInputs(db, schedule.Source, "fetch")
if err != nil {
log.Printf("error: failed to load data for %s: %v", schedule.Source, err)
continue
}
items, newState, errItem, err := Execute(schedule.Source, argv, envs, state, "", DefaultTimeout)
items, newState, errItem, err := Execute(schedule.Source, argv, envs, state, "", duration)
if err != nil {
AddErrorItem(db, errItem)
log.Printf("error: failed to execute fetch: %v", err)

View File

@ -86,6 +86,22 @@ func BumpLastUpdated(db DB, source string, now time.Time) error {
return err
}
func GetSourceTimeout(db DB, source string) (timeout string, err error) {
row := db.QueryRow(`
select value
from envs
where source = ?
and name = 'INTAKE_TIMEOUT'
`, source)
if err := row.Scan(&timeout); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return "", nil
}
return "", err
}
return timeout, err
}
func getSourceTtx(db DB, source string, env string) (int, error) {
row := db.QueryRow(`
select value
@ -240,6 +256,18 @@ func updateWithFetchedItemsTx(
log.Printf("error: failed to get on_create action for %s: %v", source, err)
}
duration := DefaultTimeout
timeout, err := GetSourceTimeout(db, source)
if err != nil {
log.Printf("error: failed to get source timeout, using default: %v", err)
} else if timeout != "" {
duration, err = time.ParseDuration(timeout)
if err != nil {
log.Printf("error: invalid duration: %v", err)
duration = DefaultTimeout
}
}
// Get all existing items
existingItems, err := GetAllItemsForSource(db, source, 0, -1)
if err != nil {
@ -290,7 +318,7 @@ func updateWithFetchedItemsTx(
for _, item := range newItems {
var updatedItem Item
var errItem Item
updatedItem, state, errItem, err = ExecuteItemAction(item, onCreateArgv, envs, state, DefaultTimeout)
updatedItem, state, errItem, err = ExecuteItemAction(item, onCreateArgv, envs, state, duration)
if err != nil {
AddErrorItem(db, errItem)
log.Printf("error: on_create failed for %s/%s: %v", item.Source, item.Id, err)

View File

@ -69,6 +69,18 @@ func (env *Env) doAction(writer http.ResponseWriter, req *http.Request) {
id := req.PathValue("id")
action := req.PathValue("action")
duration := core.DefaultTimeout
timeout, err := core.GetSourceTimeout(env.db, source)
if err != nil {
log.Printf("error: failed to get source timeout, using default: %v", err)
} else if timeout != "" {
duration, err = time.ParseDuration(timeout)
if err != nil {
log.Printf("error: invalid duration: %v", err)
duration = core.DefaultTimeout
}
}
state, envs, argv, err := core.GetSourceActionInputs(env.db, source, action)
if err != nil {
http.Error(writer, fmt.Sprintf("error: failed to load data for %s: %v", source, err), 500)
@ -85,7 +97,7 @@ func (env *Env) doAction(writer http.ResponseWriter, req *http.Request) {
return
}
newItem, newState, errItem, err := core.ExecuteItemAction(item, argv, envs, state, core.DefaultTimeout)
newItem, newState, errItem, err := core.ExecuteItemAction(item, argv, envs, state, duration)
if err != nil {
core.AddErrorItem(env.db, errItem)
http.Error(writer, err.Error(), 500)

View File

@ -57,13 +57,25 @@ func (env *Env) fetchSource(writer http.ResponseWriter, req *http.Request) {
return
}
duration := core.DefaultTimeout
timeout, err := core.GetSourceTimeout(env.db, source)
if err != nil {
log.Printf("error: failed to get source timeout, using default: %v", err)
} else if timeout != "" {
duration, err = time.ParseDuration(timeout)
if err != nil {
log.Printf("error: invalid duration: %v", err)
duration = core.DefaultTimeout
}
}
state, envs, argv, err := core.GetSourceActionInputs(env.db, source, "fetch")
if err != nil {
http.Error(writer, fmt.Sprintf("error: failed to get data for %s: %v", source, err.Error()), 500)
return
}
items, newState, errItem, err := core.Execute(source, argv, envs, state, "", core.DefaultTimeout)
items, newState, errItem, err := core.Execute(source, argv, envs, state, "", duration)
if err != nil {
core.AddErrorItem(env.db, errItem)
http.Error(writer, fmt.Sprintf("error: failed to execute fetch: %v", err.Error()), 500)