Mercurial > code > home > repos > gcalendarwatch
view calsync/sync_event.go @ 81:79320eff10f2
support status=="cancelled" to delete events
author | drewp@bigasterisk.com |
---|---|
date | Fri, 06 Sep 2024 18:26:13 -0700 |
parents | e8164bd2f9a1 |
children |
line wrap: on
line source
package main import ( "fmt" "log" "math/rand" "time" "bigasterisk.com/go/gcalendarwatch/convert" "bigasterisk.com/go/gcalendarwatch/gcalclient" "bigasterisk.com/go/gcalendarwatch/mongoclient" M "bigasterisk.com/go/gcalendarwatch/mongoclienttypes" "bigasterisk.com/go/gcalendarwatch/notificationrouter" ) // Time that we'll still accept the old watchId after we've registered a new one. const grace = 1 * time.Minute // How often to renew watches (roughly). const watchLifetime = time.Duration(120) * time.Minute // Each calendar syncs like this: // 1. Full sync of events taking place between `now-initialSyncBack` to `now+initialSyncAhead`. // 2. Garbage-collect all events last-modified before `now-initialSyncBack` // 3. Continuous watch of each calendar to catch updates. func updateMongoEventsToMatchGoogleForever( mc *mongoclient.MongoClient, gc *gcalclient.GCalClient, initialSyncBack time.Duration, initialSyncAhead time.Duration, router *notificationrouter.NotificationRouter, startupJitter time.Duration, ) error { log.Println("starting updateMongoEventsToMatchGoogleForever") now := time.Now() initialSyncT1 := now.Add(-initialSyncBack) initialSyncT2 := now.Add(initialSyncAhead) // Note that we could receive updates outside this interval. cals, err := mc.GetAllCals() if err != nil { return err } log.Println("syncing events from", len(cals), "calendars") for _, cal := range cals { rd := newCalEventsReader(mc, gc, router, cal, initialSyncT1, initialSyncT2) go func() { err := rd.watchForUpdates(startupJitter) if err != nil { log.Println("ERROR", M.Prefix(rd.cal), "watchForUpdates: ", err, "(aborting this calendar)") } }() } return nil } type calEventsReader struct { mc *mongoclient.MongoClient gc *gcalclient.GCalClient router *notificationrouter.NotificationRouter cal M.MongoCal t1, t2 time.Time syncToken string } func newCalEventsReader( mc *mongoclient.MongoClient, gc *gcalclient.GCalClient, router *notificationrouter.NotificationRouter, cal M.MongoCal, t1, t2 time.Time, ) *calEventsReader { return &calEventsReader{mc, gc, router, cal, t1, t2, ""} } func (r *calEventsReader) watchForUpdates(startupJitter time.Duration) error { r.watchForUpdatesJitteredStartup(startupJitter) err := r.updateInitialRange() if err != nil { return err } syncFunc := notificationrouter.SyncFunc(func() { err := r.sync() if err != nil { log.Fatalln("ERROR", M.Prefix(r.cal), "sync: ", err) } }) go r.renewWatchEventsForever(watchLifetime, syncFunc) return nil } func (r *calEventsReader) watchForUpdatesJitteredStartup(startupJitter time.Duration) { jitteredStartup := time.Duration(startupJitter.Seconds()*rand.Float64()) * time.Second log.Println(M.Prefix(r.cal), "watchForEvents starting in", jitteredStartup) time.Sleep(jitteredStartup) } func (r *calEventsReader) renewWatchEventsForever(lifetime time.Duration, syncFunc notificationrouter.SyncFunc) { log.Println(M.Prefix(r.cal), "renewWatchEventsForever") for { watchId := notificationrouter.NewWatchId() jitterScale := 0.5 + 1.0*rand.Float64() jitteredLifetime := time.Duration(lifetime.Seconds()*jitterScale) * time.Second r.router.AddHandler(watchId, syncFunc, jitteredLifetime+grace) err := r.gc.WatchEvents(&r.cal, watchId, jitteredLifetime+grace) if err != nil { log.Println("ERROR", M.Prefix(r.cal), "can't sync this cal: ", err) return } time.Sleep(jitteredLifetime) log.Println(M.Prefix(r.cal), "watch id=", watchId, "is expiring") } } func (r *calEventsReader) updateInitialRange() error { events, nextSyncToken, err := r.gc.ListEventsInRange(r.cal, r.t1, r.t2) if err != nil { return err } r.syncToken = nextSyncToken err = r.upsertAndDeleteEvents(events) if err != nil { return err } return r.mc.DeleteEventsUpdatedBefore(r.cal, r.t1) } func (r *calEventsReader) sync() error { events, nextSyncToken, err := r.gc.ListEventUpdates(r.cal, r.syncToken) if err != nil { return fmt.Errorf("ListEventUpdates: %v", err) } r.syncToken = nextSyncToken err = r.upsertAndDeleteEvents(events) if err != nil { return fmt.Errorf("upsertAndDeleteEvents: %v", err) } return nil } func (r *calEventsReader) upsertAndDeleteEvents(events []gcalclient.CalendarEvent) error { for _, ev := range events { if ev.Status == "cancelled" { err := r.mc.DeleteEvent(ev.EventUrl) if err != nil { return err } continue } err := r.mc.UpsertOneEvent(convert.MongoEventFromGoogleEvent(&ev, time.Now() /*todo*/)) if err != nil { return err } } return nil }