Implement background fetch

This commit is contained in:
Tim Van Baak 2025-02-20 07:18:51 -08:00
parent 7d317f47ea
commit 76353c4029
6 changed files with 179 additions and 8 deletions

View File

@ -61,6 +61,26 @@ Intake provides integration with `cron`.
To create a cron job for a source, set the `INTAKE_CRON` environment variable to a five-element crontab spec (e.g. `0 0 * * *`). To create a cron job for a source, set the `INTAKE_CRON` environment variable to a five-element crontab spec (e.g. `0 0 * * *`).
The `intake crontab` command will synchronize source cron jobs to your crontab. The `intake crontab` command will synchronize source cron jobs to your crontab.
Automatic fetching can be configured by setting the `INTAKE_FETCH` environment variable to a fetch schedule.
A fetch schedule may be:
- `every <duration>`, where `<duration>` is a Go duration string
- `at HH:MM[,HH:MM[...]]`, where HH:MM is an hour and minute
- `on DOW[,DOW[...]] [at ...]`, where DOW is an abbreviated weekday
- `on M/D[,M/D[...]] [at ...]`, where M/D is a month and day
Examples:
| INTAKE_FETCH | Schedule |
| -------------------- | --------------------------------------------- |
| `every 5m` | Every 5 minutes (00:00, 00:05, ...) |
| `every 1d` | Once per day (at midnight) |
| `every 7d` | Once per week (at midnight Sunday) |
| `at 08:00` | Once per day at 08:00 |
| `at 06:00,18:00` | Twice per day at 6am and 6pm |
| `on Tue,Thu` | Twice a week, on Tue and Thu |
| `on Mon,Fri at 12:00`| Twice a week, at noon on Monday and Friday |
| `on 3/25` | Once a year on March 25 |
| `on */7` | Each month on the 7th |
### Action API ### Action API
The Intake action API defines how programs should behave to be used with Intake sources. The Intake action API defines how programs should behave to be used with Intake sources.
@ -106,6 +126,8 @@ Instead, the web interface can be locked behind a password set via `intake passw
Parity features Parity features
* [ ] source batching * [ ] source batching
* [ ] web source add
* [x] first-party replacement for cron
* [x] NixOS module * [x] NixOS module
* [x] NixOS vm demo * [x] NixOS vm demo

47
cmd/monitor.go Normal file
View File

@ -0,0 +1,47 @@
package cmd
import (
"context"
"time"
"github.com/Jaculabilis/intake/core"
"github.com/spf13/cobra"
)
var monitorCmd = &cobra.Command{
Use: "monitor",
Short: "Run continuously and update sources according to their fetch schedule",
Long: `Run continuously and update sources.
Sources are monitored and fetched according to the schedule specified by
their INTAKE_FETCH environment variables. A schedule may be:
- "every <duration>", where <duration> is a Go duration string
- "at HH:MM[,HH:MM[...]]", where HH:MM is an hour and minute
- "on DOW[,DOW[...]] [at ...]", where DOW is an abbreviated weekday
- "on M/D[,M/D[...]] [at ...]", where M/D is a month and day
`,
Example: `
every 5m Every 5 minutes (00:00, 00:05, ...)
every 1d Once per day (at midnight)
every 7d Once per week (at midnight Sunday)
at 08:00 Once per day at 08:00
at 06:00,18:00 Twice per day at 6am and 6pm
on Tue,Thu Twice a week, on Tue and Thu
on Mon,Fri at 12:00 Twice a week, at noon on Monday and Friday
on 3/25 Once a year on March 25
on */7 Each month on the 7th
`,
Run: func(cmd *cobra.Command, args []string) {
monitor()
},
}
func init() {
rootCmd.AddCommand(monitorCmd)
}
func monitor() {
db := openAndMigrateDb()
core.BackgroundFetch(context.Background(), db, time.Minute)
}

View File

@ -1,6 +1,11 @@
package cmd package cmd
import ( import (
"context"
"log"
"time"
"github.com/Jaculabilis/intake/core"
"github.com/Jaculabilis/intake/web" "github.com/Jaculabilis/intake/web"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -24,5 +29,10 @@ func init() {
func serve(addr string, port string) { func serve(addr string, port string) {
db := openAndMigrateDb() db := openAndMigrateDb()
web.RunServer(db, addr, port)
go core.BackgroundFetch(context.Background(), db, time.Minute)
err := web.RunServer(db, addr, port)
log.Printf("error: server exited with err: %v", err)
} }

View File

@ -1,7 +1,9 @@
package core package core
import ( import (
"context"
"fmt" "fmt"
"log"
"strings" "strings"
"time" "time"
) )
@ -147,3 +149,92 @@ func parseOnSpec(base time.Time, onSpec string) (nextUpdates []time.Time, err er
} }
return return
} }
type Schedule struct {
Source string
Spec string
LastUpdated time.Time
}
func getFetchSchedules(db DB) (schedules []Schedule, err error) {
res, err := db.Query(`
select envs.source, envs.value, sources.lastUpdated
from envs
inner join sources on envs.source = sources.name
where envs.name = 'INTAKE_FETCH'
`)
if err != nil {
return nil, fmt.Errorf("failed to get source fetch specs: %v", err)
}
for res.Next() {
var source, value string
var lastUpdated int
if err = res.Scan(&source, &value, &lastUpdated); err != nil {
return nil, fmt.Errorf("failed to scan source fetch spec: %v", err)
}
schedules = append(schedules, Schedule{source, value, time.Unix(int64(lastUpdated), 0).UTC()})
}
return
}
func fetchReadySources(db DB) {
log.Printf("checking for sources to update")
schedules, err := getFetchSchedules(db)
if err != nil {
log.Printf("error: could not get fetch specs: %v", err)
return
}
now := time.Now().UTC()
for _, schedule := range schedules {
nextUpdate, err := GetNextUpdate(schedule.LastUpdated, schedule.Spec)
if err != nil {
log.Printf("error: could not determine next update for %s: %v", schedule.Source, err)
continue
}
if nextUpdate.After(now) {
log.Printf(
"%s: last update %s, next update %s",
schedule.Source,
schedule.LastUpdated.Format("2006-01-02 15:04:05"),
nextUpdate.Format("2006-01-02 15:04:05"),
)
continue
}
log.Printf("%s: fetching", schedule.Source)
state, envs, argv, postProcess, err := GetSourceActionInputs(db, schedule.Source, "fetch")
if err != nil {
log.Printf("error: failed to load data for %s: %v", schedule.Source, err)
continue
}
items, newState, errItem, err := Execute(schedule.Source, argv, envs, state, "", time.Minute, postProcess)
if err != nil {
AddErrorItem(db, errItem)
log.Printf("error: failed to execute fetch: %v", err)
continue
}
added, deleted, err := UpdateWithFetchedItems(db, schedule.Source, newState, items, time.Now())
if err != nil {
log.Printf("error: failed to update: %v", err)
continue
}
log.Printf("%s added %d items, updated %d items, and deleted %d items", schedule.Source, added, len(items)-added, deleted)
}
}
func BackgroundFetch(ctx context.Context, db DB, sleep time.Duration) error {
log.Print("starting fetch monitor")
fetchReadySources(db)
tick := time.NewTicker(sleep)
for {
select {
case <-ctx.Done():
log.Printf("ending fetch monitor")
return ctx.Err()
case <-tick.C:
fetchReadySources(db)
}
}
}

View File

@ -54,11 +54,12 @@ for i in $(seq 2 211); do
done done
tmp/intake item add -s page --id 212 --title "Item 212" --body "This is the body of item 212" tmp/intake item add -s page --id 212 --title "Item 212" --body "This is the body of item 212"
# test auto update
tmp/intake source env -s nothing --set "INTAKE_FETCH=every 5m"
tmp/intake source add -s hello
tmp/intake action add -s hello -a fetch -- sh -c "cat /dev/random | tr -dc 'A-Za-z0-9' | head -c16 | jq -cR '{id: ., title: \"Hello\"}'"
tmp/intake source env -s hello --set "INTAKE_FETCH=every 1m"
# default password, comment out to test no password # default password, comment out to test no password
echo "hello" | tmp/intake passwd --stdin echo "hello" | tmp/intake passwd --stdin
echo "hello" | tmp/intake passwd --stdin --verify echo "hello" | tmp/intake passwd --stdin --verify
# crontab integration
tmp/intake source env -s page --set "INTAKE_CRON=0 0 * * *"
tmp/intake source env -s spook --set "INTAKE_CRON=0 0 * * *"
tmp/intake source env -s feedtest --set "INTAKE_CRON=0 0 * * *"

View File

@ -31,7 +31,7 @@ func handleFunc(
http.HandleFunc(pattern, handler) http.HandleFunc(pattern, handler)
} }
func RunServer(db core.DB, addr string, port string) { func RunServer(db core.DB, addr string, port string) error {
env := &Env{db} env := &Env{db}
bind := net.JoinHostPort(addr, port) bind := net.JoinHostPort(addr, port)
@ -55,7 +55,7 @@ func RunServer(db core.DB, addr string, port string) {
handleFunc("POST /mass-deactivate", env.massDeactivate, env.authed, logged) handleFunc("POST /mass-deactivate", env.massDeactivate, env.authed, logged)
log.Printf("listening on %s", bind) log.Printf("listening on %s", bind)
log.Fatal(http.ListenAndServe(bind, nil)) return http.ListenAndServe(bind, nil)
} }
func getQueryInt(req *http.Request, name string, def int) int { func getQueryInt(req *http.Request, name string, def int) int {