From 76353c4029d93a8a09ab4edf48be629577a11ad0 Mon Sep 17 00:00:00 2001 From: Tim Van Baak Date: Thu, 20 Feb 2025 07:18:51 -0800 Subject: [PATCH] Implement background fetch --- README.md | 22 +++++++++++ cmd/monitor.go | 47 ++++++++++++++++++++++++ cmd/serve.go | 12 +++++- core/cron.go | 91 ++++++++++++++++++++++++++++++++++++++++++++++ test/test_items.sh | 11 +++--- web/main.go | 4 +- 6 files changed, 179 insertions(+), 8 deletions(-) create mode 100644 cmd/monitor.go diff --git a/README.md b/README.md index 8460b88..7daf405 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,26 @@ Intake provides integration with `cron`. To create a cron job for a source, set the `INTAKE_CRON` environment variable to a five-element crontab spec (e.g. `0 0 * * *`). The `intake crontab` command will synchronize source cron jobs to your crontab. +Automatic fetching can be configured by setting the `INTAKE_FETCH` environment variable to a fetch schedule. +A fetch schedule may be: +- `every `, where `` is a Go duration string +- `at HH:MM[,HH:MM[...]]`, where HH:MM is an hour and minute +- `on DOW[,DOW[...]] [at ...]`, where DOW is an abbreviated weekday +- `on M/D[,M/D[...]] [at ...]`, where M/D is a month and day + +Examples: +| INTAKE_FETCH | Schedule | +| -------------------- | --------------------------------------------- | +| `every 5m` | Every 5 minutes (00:00, 00:05, ...) | +| `every 1d` | Once per day (at midnight) | +| `every 7d` | Once per week (at midnight Sunday) | +| `at 08:00` | Once per day at 08:00 | +| `at 06:00,18:00` | Twice per day at 6am and 6pm | +| `on Tue,Thu` | Twice a week, on Tue and Thu | +| `on Mon,Fri at 12:00`| Twice a week, at noon on Monday and Friday | +| `on 3/25` | Once a year on March 25 | +| `on */7` | Each month on the 7th | + ### Action API The Intake action API defines how programs should behave to be used with Intake sources. @@ -106,6 +126,8 @@ Instead, the web interface can be locked behind a password set via `intake passw Parity features * [ ] source batching +* [ ] web source add +* [x] first-party replacement for cron * [x] NixOS module * [x] NixOS vm demo diff --git a/cmd/monitor.go b/cmd/monitor.go new file mode 100644 index 0000000..995f5a3 --- /dev/null +++ b/cmd/monitor.go @@ -0,0 +1,47 @@ +package cmd + +import ( + "context" + "time" + + "github.com/Jaculabilis/intake/core" + "github.com/spf13/cobra" +) + +var monitorCmd = &cobra.Command{ + Use: "monitor", + Short: "Run continuously and update sources according to their fetch schedule", + Long: `Run continuously and update sources. + +Sources are monitored and fetched according to the schedule specified by +their INTAKE_FETCH environment variables. A schedule may be: + +- "every ", where is a Go duration string +- "at HH:MM[,HH:MM[...]]", where HH:MM is an hour and minute +- "on DOW[,DOW[...]] [at ...]", where DOW is an abbreviated weekday +- "on M/D[,M/D[...]] [at ...]", where M/D is a month and day +`, + Example: ` +every 5m Every 5 minutes (00:00, 00:05, ...) +every 1d Once per day (at midnight) +every 7d Once per week (at midnight Sunday) +at 08:00 Once per day at 08:00 +at 06:00,18:00 Twice per day at 6am and 6pm +on Tue,Thu Twice a week, on Tue and Thu +on Mon,Fri at 12:00 Twice a week, at noon on Monday and Friday +on 3/25 Once a year on March 25 +on */7 Each month on the 7th +`, + Run: func(cmd *cobra.Command, args []string) { + monitor() + }, +} + +func init() { + rootCmd.AddCommand(monitorCmd) +} + +func monitor() { + db := openAndMigrateDb() + core.BackgroundFetch(context.Background(), db, time.Minute) +} diff --git a/cmd/serve.go b/cmd/serve.go index 0c9a4bd..f4ee368 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -1,6 +1,11 @@ package cmd import ( + "context" + "log" + "time" + + "github.com/Jaculabilis/intake/core" "github.com/Jaculabilis/intake/web" "github.com/spf13/cobra" ) @@ -24,5 +29,10 @@ func init() { func serve(addr string, port string) { db := openAndMigrateDb() - web.RunServer(db, addr, port) + + go core.BackgroundFetch(context.Background(), db, time.Minute) + + err := web.RunServer(db, addr, port) + + log.Printf("error: server exited with err: %v", err) } diff --git a/core/cron.go b/core/cron.go index c9275a4..2362125 100644 --- a/core/cron.go +++ b/core/cron.go @@ -1,7 +1,9 @@ package core import ( + "context" "fmt" + "log" "strings" "time" ) @@ -147,3 +149,92 @@ func parseOnSpec(base time.Time, onSpec string) (nextUpdates []time.Time, err er } return } + +type Schedule struct { + Source string + Spec string + LastUpdated time.Time +} + +func getFetchSchedules(db DB) (schedules []Schedule, err error) { + res, err := db.Query(` + select envs.source, envs.value, sources.lastUpdated + from envs + inner join sources on envs.source = sources.name + where envs.name = 'INTAKE_FETCH' + `) + if err != nil { + return nil, fmt.Errorf("failed to get source fetch specs: %v", err) + } + for res.Next() { + var source, value string + var lastUpdated int + if err = res.Scan(&source, &value, &lastUpdated); err != nil { + return nil, fmt.Errorf("failed to scan source fetch spec: %v", err) + } + schedules = append(schedules, Schedule{source, value, time.Unix(int64(lastUpdated), 0).UTC()}) + } + return +} + +func fetchReadySources(db DB) { + log.Printf("checking for sources to update") + schedules, err := getFetchSchedules(db) + if err != nil { + log.Printf("error: could not get fetch specs: %v", err) + return + } + now := time.Now().UTC() + for _, schedule := range schedules { + nextUpdate, err := GetNextUpdate(schedule.LastUpdated, schedule.Spec) + if err != nil { + log.Printf("error: could not determine next update for %s: %v", schedule.Source, err) + continue + } + if nextUpdate.After(now) { + log.Printf( + "%s: last update %s, next update %s", + schedule.Source, + schedule.LastUpdated.Format("2006-01-02 15:04:05"), + nextUpdate.Format("2006-01-02 15:04:05"), + ) + continue + } + log.Printf("%s: fetching", schedule.Source) + + state, envs, argv, postProcess, err := GetSourceActionInputs(db, schedule.Source, "fetch") + if err != nil { + log.Printf("error: failed to load data for %s: %v", schedule.Source, err) + continue + } + + items, newState, errItem, err := Execute(schedule.Source, argv, envs, state, "", time.Minute, postProcess) + if err != nil { + AddErrorItem(db, errItem) + log.Printf("error: failed to execute fetch: %v", err) + continue + } + + added, deleted, err := UpdateWithFetchedItems(db, schedule.Source, newState, items, time.Now()) + if err != nil { + log.Printf("error: failed to update: %v", err) + continue + } + log.Printf("%s added %d items, updated %d items, and deleted %d items", schedule.Source, added, len(items)-added, deleted) + } +} + +func BackgroundFetch(ctx context.Context, db DB, sleep time.Duration) error { + log.Print("starting fetch monitor") + fetchReadySources(db) + tick := time.NewTicker(sleep) + for { + select { + case <-ctx.Done(): + log.Printf("ending fetch monitor") + return ctx.Err() + case <-tick.C: + fetchReadySources(db) + } + } +} diff --git a/test/test_items.sh b/test/test_items.sh index 98228d9..8937e70 100755 --- a/test/test_items.sh +++ b/test/test_items.sh @@ -54,11 +54,12 @@ for i in $(seq 2 211); do done tmp/intake item add -s page --id 212 --title "Item 212" --body "This is the body of item 212" +# test auto update +tmp/intake source env -s nothing --set "INTAKE_FETCH=every 5m" +tmp/intake source add -s hello +tmp/intake action add -s hello -a fetch -- sh -c "cat /dev/random | tr -dc 'A-Za-z0-9' | head -c16 | jq -cR '{id: ., title: \"Hello\"}'" +tmp/intake source env -s hello --set "INTAKE_FETCH=every 1m" + # default password, comment out to test no password echo "hello" | tmp/intake passwd --stdin echo "hello" | tmp/intake passwd --stdin --verify - -# crontab integration -tmp/intake source env -s page --set "INTAKE_CRON=0 0 * * *" -tmp/intake source env -s spook --set "INTAKE_CRON=0 0 * * *" -tmp/intake source env -s feedtest --set "INTAKE_CRON=0 0 * * *" diff --git a/web/main.go b/web/main.go index 1c087f2..a3cb9b1 100644 --- a/web/main.go +++ b/web/main.go @@ -31,7 +31,7 @@ func handleFunc( http.HandleFunc(pattern, handler) } -func RunServer(db core.DB, addr string, port string) { +func RunServer(db core.DB, addr string, port string) error { env := &Env{db} bind := net.JoinHostPort(addr, port) @@ -55,7 +55,7 @@ func RunServer(db core.DB, addr string, port string) { handleFunc("POST /mass-deactivate", env.massDeactivate, env.authed, logged) log.Printf("listening on %s", bind) - log.Fatal(http.ListenAndServe(bind, nil)) + return http.ListenAndServe(bind, nil) } func getQueryInt(req *http.Request, name string, def int) int {