335 lines
9.6 KiB
Go
335 lines
9.6 KiB
Go
package core
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
type cronSpec interface {
|
|
// Get the next time matching this spec that is after the base time.
|
|
GetNextTime(base time.Time) (nextUpdate time.Time)
|
|
}
|
|
|
|
func GetNextSpecTime(lastUpdated time.Time, spec string) (nextUpdate time.Time, err error) {
|
|
var cron cronSpec
|
|
cron, err = ParseCronSpec(spec)
|
|
if err != nil {
|
|
return time.Time{}, err
|
|
}
|
|
return cron.GetNextTime(lastUpdated), nil
|
|
}
|
|
|
|
func ParseCronSpec(spec string) (cron cronSpec, err error) {
|
|
switch {
|
|
case strings.HasPrefix(spec, "every "):
|
|
cron, err = parseEverySpec(spec[len("every "):])
|
|
case strings.HasPrefix(spec, "at "):
|
|
cron, err = parseAtSpec(spec[len("at "):])
|
|
case strings.HasPrefix(spec, "on "):
|
|
cron, err = parseOnSpec(spec[len("on "):])
|
|
default:
|
|
// Interpret "HH:MM" as "at HH:MM"
|
|
cron, err = parseAtSpec(spec)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unknown spec format: %v", spec)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// Return the time that follows the base time most closely.
|
|
func nextAfter(base time.Time, nextUpdates []time.Time) (nextUpdate time.Time) {
|
|
for _, next := range nextUpdates {
|
|
if next.After(base) && (nextUpdate.IsZero() || next.Before(nextUpdate)) {
|
|
nextUpdate = next
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// An every-spec is a Go duration string, e.g. "every 5m"
|
|
type everySpec struct {
|
|
duration time.Duration
|
|
}
|
|
|
|
func parseEverySpec(spec string) (*everySpec, error) {
|
|
duration, err := time.ParseDuration(spec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &everySpec{duration}, nil
|
|
}
|
|
|
|
// Get the next time matching the every-spec that is after the base time.
|
|
func (spec *everySpec) GetNextTime(base time.Time) (nextUpdate time.Time) {
|
|
next := base.Round(spec.duration)
|
|
if !next.After(base) {
|
|
next = next.Add(spec.duration)
|
|
}
|
|
return next
|
|
}
|
|
|
|
// An at-spec is in the patterm HH:MM[,HH:MM,[...]].
|
|
type atSpec struct {
|
|
hours []int
|
|
minutes []int
|
|
}
|
|
|
|
func parseAtSpec(spec string) (*atSpec, error) {
|
|
var hours, minutes []int
|
|
for _, timeSpec := range strings.Split(spec, ",") {
|
|
var hour, minute int
|
|
_, err := fmt.Sscanf(timeSpec, "%d:%d", &hour, &minute)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not parse at-spec %s: %w", timeSpec, err)
|
|
}
|
|
hours = append(hours, hour)
|
|
minutes = append(minutes, minute)
|
|
}
|
|
return &atSpec{hours, minutes}, nil
|
|
}
|
|
|
|
// Get the next time matching the at-spec that is after the base time.
|
|
func (spec *atSpec) GetNextTime(base time.Time) (nextUpdate time.Time) {
|
|
nextUpdates := []time.Time{}
|
|
for i, hour := range spec.hours {
|
|
minute := spec.minutes[i]
|
|
// The time instance on the same day as the base time
|
|
specOfDay := time.Date(base.Year(), base.Month(), base.Day(), hour, minute, 0, 0, base.Location())
|
|
// Bump it forward one day if it's before the base time
|
|
if !specOfDay.After(base) {
|
|
specOfDay = specOfDay.Add(24 * time.Hour)
|
|
}
|
|
nextUpdates = append(nextUpdates, specOfDay)
|
|
}
|
|
return nextAfter(base, nextUpdates)
|
|
}
|
|
|
|
var weekdays = map[string]time.Weekday{
|
|
"Sun": time.Sunday,
|
|
"Mon": time.Monday,
|
|
"Tue": time.Tuesday,
|
|
"Wed": time.Wednesday,
|
|
"Thu": time.Thursday,
|
|
"Fri": time.Friday,
|
|
"Sat": time.Saturday,
|
|
}
|
|
|
|
type ymd struct {
|
|
Year int
|
|
Month time.Month
|
|
Day int
|
|
}
|
|
|
|
type onSpecDate interface {
|
|
getDates(base time.Time) []ymd
|
|
}
|
|
|
|
type onSpecWeekday struct {
|
|
weekday time.Weekday
|
|
}
|
|
|
|
func (date *onSpecWeekday) getDates(base time.Time) (dates []ymd) {
|
|
// For a weekday, add the next of that weekday 0-6 days ahead and 7-13 days ahead.
|
|
// The first date ensures that we don't miss multiple updates on the same day of the week
|
|
// (e.g. "on Sun at 06:00,18:00") and the second date ensures that we don't get stuck with
|
|
// only a date in the past (e.g. "on Sun at 02:00" when base is Sun 03:00).
|
|
daysForward := (int(date.weekday) - int(base.Weekday()) + 7) % 7 // value in [0,6]
|
|
day0_6 := base.AddDate(0, 0, daysForward)
|
|
dates = append(dates, ymd{day0_6.Year(), day0_6.Month(), day0_6.Day()})
|
|
day7_13 := base.AddDate(0, 0, daysForward+7)
|
|
dates = append(dates, ymd{day7_13.Year(), day7_13.Month(), day7_13.Day()})
|
|
return
|
|
}
|
|
|
|
type onSpecEveryMonth struct {
|
|
day int
|
|
}
|
|
|
|
func (date *onSpecEveryMonth) getDates(base time.Time) (dates []ymd) {
|
|
// For every-month, add the date for the current month and the date for the next month.
|
|
dates = append(dates, ymd{base.Year(), base.Month(), date.day})
|
|
nextMonth := base.AddDate(0, 1, 0)
|
|
dates = append(dates, ymd{nextMonth.Year(), nextMonth.Month(), date.day})
|
|
return
|
|
}
|
|
|
|
type onSpecMonthDay struct {
|
|
month int
|
|
day int
|
|
}
|
|
|
|
func (date *onSpecMonthDay) getDates(base time.Time) (dates []ymd) {
|
|
// For month/day, add the date for the base year and the next year.
|
|
dates = append(dates, ymd{base.Year(), time.Month(date.month), date.day})
|
|
dates = append(dates, ymd{base.Year() + 1, time.Month(date.month), date.day})
|
|
return
|
|
}
|
|
|
|
// An on-spec is in the pattern DOW[,DOW[...]] where DOW is an abbreviated weekday
|
|
// or M/D[,M/D[...]] where M/D is a month and day.
|
|
// As a special case, "*/N" matches the Nth day of every month.
|
|
// An on-spec may be followed by an at-spec; otherwise, "at 00:00" is implied.
|
|
type onSpec struct {
|
|
atSpec atSpec
|
|
dates []onSpecDate
|
|
}
|
|
|
|
func parseOnSpec(spec string) (*onSpec, error) {
|
|
atPart := "00:00"
|
|
if at := strings.Index(spec, " at "); at > -1 {
|
|
atPart = spec[at+len(" at "):]
|
|
spec = spec[:at]
|
|
}
|
|
atSpec, err := parseAtSpec(atPart)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not parse at-spec %v: %w", atSpec, err)
|
|
}
|
|
|
|
var dates []onSpecDate
|
|
for _, daySpec := range strings.Split(spec, ",") {
|
|
if weekday, ok := weekdays[daySpec]; ok {
|
|
dates = append(dates, &onSpecWeekday{weekday})
|
|
} else if strings.HasPrefix(daySpec, "*/") {
|
|
var day int
|
|
_, err := fmt.Sscanf(daySpec, "*/%d", &day)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not parse month/day %s: %v", daySpec, err)
|
|
}
|
|
dates = append(dates, &onSpecEveryMonth{day})
|
|
} else {
|
|
var month, day int
|
|
_, err := fmt.Sscanf(daySpec, "%d/%d", &month, &day)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not parse month/day %s: %v", daySpec, err)
|
|
}
|
|
dates = append(dates, &onSpecMonthDay{month, day})
|
|
}
|
|
}
|
|
return &onSpec{*atSpec, dates}, nil
|
|
}
|
|
|
|
// Get the next time matching the on-spec that is after the base time.
|
|
func (spec *onSpec) GetNextTime(base time.Time) (nextUpdate time.Time) {
|
|
// Convert each type of on-spec date spec to a concrete YMD date
|
|
var dates []ymd
|
|
for _, date := range spec.dates {
|
|
dates = append(dates, date.getDates(base)...)
|
|
}
|
|
|
|
// Then, for each date, create a datetime for each at-spec.
|
|
nextUpdates := []time.Time{}
|
|
for _, date := range dates {
|
|
for i, atHour := range spec.atSpec.hours {
|
|
atMinute := spec.atSpec.minutes[i]
|
|
nextUpdates = append(nextUpdates, time.Date(date.Year, date.Month, date.Day, atHour, atMinute, 0, 0, base.Location()))
|
|
}
|
|
}
|
|
return nextAfter(base, nextUpdates)
|
|
}
|
|
|
|
type Schedule struct {
|
|
Source string
|
|
Spec string
|
|
LastUpdated time.Time
|
|
}
|
|
|
|
func getFetchSchedules(db DB) (schedules []Schedule, err error) {
|
|
res, err := db.Query(`
|
|
select envs.source, envs.value, sources.lastUpdated
|
|
from envs
|
|
inner join sources on envs.source = sources.name
|
|
where envs.name = 'INTAKE_FETCH'
|
|
`)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get source fetch specs: %v", err)
|
|
}
|
|
for res.Next() {
|
|
var source, value string
|
|
var lastUpdated int
|
|
if err = res.Scan(&source, &value, &lastUpdated); err != nil {
|
|
return nil, fmt.Errorf("failed to scan source fetch spec: %v", err)
|
|
}
|
|
schedules = append(schedules, Schedule{source, value, time.Unix(int64(lastUpdated), 0).UTC()})
|
|
}
|
|
return
|
|
}
|
|
|
|
func fetchReadySources(db DB) {
|
|
log.Printf("checking for sources to update")
|
|
schedules, err := getFetchSchedules(db)
|
|
if err != nil {
|
|
log.Printf("error: could not get fetch specs: %v", err)
|
|
return
|
|
}
|
|
now := time.Now().UTC()
|
|
for _, schedule := range schedules {
|
|
nextUpdate, err := GetNextSpecTime(schedule.LastUpdated, schedule.Spec)
|
|
if err != nil {
|
|
log.Printf("error: could not determine next update for %s: %v", schedule.Source, err)
|
|
continue
|
|
}
|
|
if nextUpdate.After(now) {
|
|
// TODO log this at a debug log level
|
|
// log.Printf(
|
|
// "%s: last update %s, next update %s",
|
|
// schedule.Source,
|
|
// schedule.LastUpdated.Format("2006-01-02 15:04:05"),
|
|
// nextUpdate.Format("2006-01-02 15:04:05"),
|
|
// )
|
|
continue
|
|
}
|
|
log.Printf("%s: fetching", schedule.Source)
|
|
|
|
duration := DefaultTimeout
|
|
timeout, err := GetSourceTimeout(db, schedule.Source)
|
|
if err != nil {
|
|
log.Printf("error: failed to get source timeout, using default: %v", err)
|
|
} else if timeout != "" {
|
|
duration, err = time.ParseDuration(timeout)
|
|
if err != nil {
|
|
log.Printf("error: invalid duration: %v", err)
|
|
duration = DefaultTimeout
|
|
}
|
|
}
|
|
|
|
state, envs, argv, err := GetSourceActionInputs(db, schedule.Source, "fetch")
|
|
if err != nil {
|
|
log.Printf("error: failed to load data for %s: %v", schedule.Source, err)
|
|
continue
|
|
}
|
|
|
|
items, newState, errItem, err := Execute(schedule.Source, argv, envs, state, "", duration)
|
|
if err != nil {
|
|
AddErrorItem(db, errItem)
|
|
log.Printf("error: failed to execute fetch: %v", err)
|
|
continue
|
|
}
|
|
|
|
added, deleted, err := UpdateWithFetchedItems(db, schedule.Source, newState, items, time.Now())
|
|
if err != nil {
|
|
log.Printf("error: failed to update: %v", err)
|
|
continue
|
|
}
|
|
log.Printf("%s added %d items, updated %d items, and deleted %d items", schedule.Source, added, len(items)-added, deleted)
|
|
}
|
|
}
|
|
|
|
func BackgroundFetch(ctx context.Context, db DB, sleep time.Duration) error {
|
|
log.Print("starting fetch monitor")
|
|
fetchReadySources(db)
|
|
tick := time.NewTicker(sleep)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Printf("ending fetch monitor")
|
|
return ctx.Err()
|
|
case <-tick.C:
|
|
fetchReadySources(db)
|
|
}
|
|
}
|
|
}
|