Mercurial > code > home > repos > gcalendarwatch
diff calsync/event_sync.go @ 61:8aee4f5c4bdd
receive notifications and route them to calendar sync functions
author | drewp@bigasterisk.com |
---|---|
date | Fri, 06 Sep 2024 16:43:14 -0700 |
parents | 3b0595c2bf03 |
children |
line wrap: on
line diff
--- a/calsync/event_sync.go Fri Sep 06 16:41:48 2024 -0700 +++ b/calsync/event_sync.go Fri Sep 06 16:43:14 2024 -0700 @@ -2,28 +2,35 @@ import ( "log" + "math/rand" "time" "bigasterisk.com/go/gcalendarwatch/convert" "bigasterisk.com/go/gcalendarwatch/gcalclient" "bigasterisk.com/go/gcalendarwatch/mongoclient" M "bigasterisk.com/go/gcalendarwatch/mongoclienttypes" + "bigasterisk.com/go/gcalendarwatch/notificationrouter" ) +// Time that we'll still accept the old watchId after we've registered a new one. +const grace = 1 * time.Minute + // Each calendar syncs like this: -// 1. Full sync of events taking place between `now-keepHistory` to `now+syncAhead`. -// 2. Garbage-collect all events last-modified before `now-keepHistory` +// 1. Full sync of events taking place between `now-initialSyncBack` to `now+initialSyncAhead`. +// 2. Garbage-collect all events last-modified before `now-initialSyncBack` // 3. Continuous watch of each calendar to catch updates. func updateMongoEventsToMatchGoogleForever( mc *mongoclient.MongoClient, gc *gcalclient.GCalClient, - keepHistory time.Duration, - syncAhead time.Duration) error { + initialSyncBack time.Duration, + initialSyncAhead time.Duration, + router *notificationrouter.NotificationRouter, +) error { log.Println("starting updateMongoEventsToMatchGoogleForever") now := time.Now() - syncStart := now.Add(-keepHistory) - syncEnd := now.Add(syncAhead) + initialSyncT1 := now.Add(-initialSyncBack) + initialSyncT2 := now.Add(initialSyncAhead) // Note that we could receive updates outside this interval. cals, err := mc.GetAllCals() @@ -33,8 +40,8 @@ log.Println("syncing events from", len(cals), "calendars") for _, cal := range cals { - rd := newCalEventsReader(mc, gc, cal, syncStart, syncEnd) - go rd.updateForever() + rd := newCalEventsReader(mc, gc, router, cal, initialSyncT1, initialSyncT2) + go rd.watchForUpdates() } return nil } @@ -42,29 +49,61 @@ type calEventsReader struct { mc *mongoclient.MongoClient gc *gcalclient.GCalClient + router *notificationrouter.NotificationRouter cal M.MongoCal t1, t2 time.Time syncToken string } -func newCalEventsReader(mc *mongoclient.MongoClient, gc *gcalclient.GCalClient, cal M.MongoCal, t1, t2 time.Time) *calEventsReader { - return &calEventsReader{mc, gc, cal, t1, t2, ""} +func newCalEventsReader( + mc *mongoclient.MongoClient, + gc *gcalclient.GCalClient, + router *notificationrouter.NotificationRouter, + cal M.MongoCal, + t1, t2 time.Time, +) *calEventsReader { + return &calEventsReader{mc, gc, router, cal, t1, t2, ""} } -func (r *calEventsReader) updateForever() { +func (r *calEventsReader) watchForUpdates() { + jitteredStartup := time.Duration(rand.Int()%30) * time.Second + log.Println(M.Prefix(r.cal), "watchForEvents starting in", jitteredStartup) + time.Sleep(jitteredStartup) + err := r.updateInitialRange() if err != nil { log.Fatal(err) } - for { - log.Println("syncing", r.cal.Summary, "with sync", r.syncToken) - err = r.sync() + jitter := time.Duration(rand.Intn(4)) * time.Minute + watchLifetime := time.Duration(3)*time.Minute + jitter + + syncFunc := notificationrouter.SyncFunc(func() { + err := r.sync() if err != nil { log.Fatal(err) } + }) + go r.renewWatchEventsForever(watchLifetime, syncFunc) +} - time.Sleep(10 * time.Second) +func (r *calEventsReader) renewWatchEventsForever(lifetime time.Duration, syncFunc notificationrouter.SyncFunc) { + log.Println(M.Prefix(r.cal), "renewWatchEventsForever") + + for { + watchId := notificationrouter.NewWatchId() + jitterScale := rand.Float64()*1 + 0.5 + jitteredLifetime := time.Duration(lifetime.Seconds()*jitterScale) * time.Second + + r.router.AddHandler(watchId, syncFunc, jitteredLifetime+grace) + + err := r.gc.WatchEvents(&r.cal, watchId, jitteredLifetime+grace) + if err != nil { + log.Println("ERROR", M.Prefix(r.cal), "can't sync this cal: ", err) + return + } + time.Sleep(jitteredLifetime) + log.Println(M.Prefix(r.cal), "watch id=", watchId, "is expiring") } }