package core import ( "context" "fmt" "log" "strings" "time" ) func GetNextUpdate(lastUpdated time.Time, spec string) (nextUpdate time.Time, err error) { var nextUpdates []time.Time switch { case strings.HasPrefix(spec, "every "): nextUpdates, err = parseEverySpec(lastUpdated, spec[len("every "):]) case strings.HasPrefix(spec, "at "): nextUpdates, err = parseAtSpec(lastUpdated, spec[len("at "):]) case strings.HasPrefix(spec, "on "): nextUpdates, err = parseOnSpec(lastUpdated, spec[len("on "):]) default: return time.Time{}, fmt.Errorf("unknown spec format: %v", spec) } if err != nil { return time.Time{}, err } for _, next := range nextUpdates { if next.After(lastUpdated) && (nextUpdate.IsZero() || next.Before(nextUpdate)) { nextUpdate = next } } return } // Get the next instance of the every-spec after the base time. // An every-spec is a Go duration string. func parseEverySpec(base time.Time, everySpec string) (nextUpdates []time.Time, err error) { var duration time.Duration duration, err = time.ParseDuration(everySpec) if err == nil { next := base.Round(duration) if !next.After(base) { next = next.Add(duration) } nextUpdates = []time.Time{next} } return } // Get the next instances of the at-spec times after the base time. // An at-spec is in the patterm HH:MM[,HH:MM,[...]]. func parseAtSpec(base time.Time, atSpec string) (nextUpdates []time.Time, err error) { for _, timeSpec := range strings.Split(atSpec, ",") { var hour, minute int _, err = fmt.Sscanf(timeSpec, "%d:%d", &hour, &minute) if err != nil { return nil, fmt.Errorf("could not parse %s: %v", timeSpec, err) } // The time instance on the same day as the base time specOfDay := time.Date(base.Year(), base.Month(), base.Day(), hour, minute, 0, 0, base.Location()) // Bump it forward one day if it's before the base time if !specOfDay.After(base) { specOfDay = specOfDay.Add(24 * time.Hour) } nextUpdates = append(nextUpdates, specOfDay) } return } var weekdays = map[string]time.Weekday{ "Sun": time.Sunday, "Mon": time.Monday, "Tue": time.Tuesday, "Wed": time.Wednesday, "Thu": time.Thursday, "Fri": time.Friday, "Sat": time.Saturday, } // Get the next instances of the on-spec times after the base time. // An on-spec is in the pattern DOW[,DOW[...]] where DOW is an abbreviated weekday // or M/D[,M/D[...]] where M/D is a month and day. // As a special case, "*/N" matches the Nth day of every month. // An on-spec may be followed by an at-spec; otherwise, "at 00:00" is implied. func parseOnSpec(base time.Time, onSpec string) (nextUpdates []time.Time, err error) { type Date struct { Year int Month time.Month Day int } type Time struct { Hour int Minute int } atSpec := "00:00" if on := strings.Index(onSpec, " at "); on > -1 { atSpec = onSpec[on+len(" at "):] onSpec = onSpec[:on] } var atTimes []Time for _, timeSpec := range strings.Split(atSpec, ",") { var hour, minute int _, err := fmt.Sscanf(timeSpec, "%d:%d", &hour, &minute) if err != nil { return nil, fmt.Errorf("could not parse at-spec %s: %v", timeSpec, err) } atTimes = append(atTimes, Time{hour, minute}) } var dates []Date for _, daySpec := range strings.Split(onSpec, ",") { if weekday, ok := weekdays[daySpec]; ok { // For a weekday, add the next of that weekday 0-6 days ahead and 7-13 days ahead. // The first date ensures that we don't miss multiple updates on the same day of the week // (e.g. "on Sun at 06:00,18:00") and the second date ensures that we don't get stuck with // only a date in the past (e.g. "on Sun at 02:00" when base is Sun 03:00). daysForward := (int(weekday) - int(base.Weekday()) + 7) % 7 // value in [0,6] day0_6 := base.AddDate(0, 0, daysForward) dates = append(dates, Date{day0_6.Year(), day0_6.Month(), day0_6.Day()}) day7_13 := base.AddDate(0, 0, daysForward+7) dates = append(dates, Date{day7_13.Year(), day7_13.Month(), day7_13.Day()}) } else if strings.HasPrefix(daySpec, "*/") { // For every-month, add the date for the current month and the date for the next month. var day int _, err := fmt.Sscanf(daySpec, "*/%d", &day) if err != nil { return nil, fmt.Errorf("could not parse month/day %s: %v", daySpec, err) } dates = append(dates, Date{base.Year(), base.Month(), day}) nextMonth := base.AddDate(0, 1, 0) dates = append(dates, Date{nextMonth.Year(), nextMonth.Month(), day}) } else { // For month/day, add the date for the base year and the next year. var month, day int _, err := fmt.Sscanf(daySpec, "%d/%d", &month, &day) if err != nil { return nil, fmt.Errorf("could not parse month/day %s: %v", daySpec, err) } dates = append(dates, Date{base.Year(), time.Month(month), day}) dates = append(dates, Date{base.Year() + 1, time.Month(month), day}) } } // Now, for each date, create a datetime based on the at-spec. for _, date := range dates { for _, atTime := range atTimes { nextUpdates = append(nextUpdates, time.Date(date.Year, date.Month, date.Day, atTime.Hour, atTime.Minute, 0, 0, base.Location())) } } return } type Schedule struct { Source string Spec string LastUpdated time.Time } func getFetchSchedules(db DB) (schedules []Schedule, err error) { res, err := db.Query(` select envs.source, envs.value, sources.lastUpdated from envs inner join sources on envs.source = sources.name where envs.name = 'INTAKE_FETCH' `) if err != nil { return nil, fmt.Errorf("failed to get source fetch specs: %v", err) } for res.Next() { var source, value string var lastUpdated int if err = res.Scan(&source, &value, &lastUpdated); err != nil { return nil, fmt.Errorf("failed to scan source fetch spec: %v", err) } schedules = append(schedules, Schedule{source, value, time.Unix(int64(lastUpdated), 0).UTC()}) } return } func fetchReadySources(db DB) { log.Printf("checking for sources to update") schedules, err := getFetchSchedules(db) if err != nil { log.Printf("error: could not get fetch specs: %v", err) return } now := time.Now().UTC() for _, schedule := range schedules { nextUpdate, err := GetNextUpdate(schedule.LastUpdated, schedule.Spec) if err != nil { log.Printf("error: could not determine next update for %s: %v", schedule.Source, err) continue } if nextUpdate.After(now) { // TODO log this at a debug log level // log.Printf( // "%s: last update %s, next update %s", // schedule.Source, // schedule.LastUpdated.Format("2006-01-02 15:04:05"), // nextUpdate.Format("2006-01-02 15:04:05"), // ) continue } log.Printf("%s: fetching", schedule.Source) state, envs, argv, postProcess, err := GetSourceActionInputs(db, schedule.Source, "fetch") if err != nil { log.Printf("error: failed to load data for %s: %v", schedule.Source, err) continue } items, newState, errItem, err := Execute(schedule.Source, argv, envs, state, "", DefaultTimeout, postProcess) if err != nil { AddErrorItem(db, errItem) log.Printf("error: failed to execute fetch: %v", err) continue } added, deleted, err := UpdateWithFetchedItems(db, schedule.Source, newState, items, time.Now()) if err != nil { log.Printf("error: failed to update: %v", err) continue } log.Printf("%s added %d items, updated %d items, and deleted %d items", schedule.Source, added, len(items)-added, deleted) } } func BackgroundFetch(ctx context.Context, db DB, sleep time.Duration) error { log.Print("starting fetch monitor") fetchReadySources(db) tick := time.NewTicker(sleep) for { select { case <-ctx.Done(): log.Printf("ending fetch monitor") return ctx.Err() case <-tick.C: fetchReadySources(db) } } }