# HG changeset patch # User drewp@bigasterisk.com # Date 1725666362 25200 # Node ID 19e3def953e1ed0755f892a9fbe5a8a7462c656d # Parent 465defa3f956612517205e7f676ddf68599a41e4 rename so they sort next to each other diff -r 465defa3f956 -r 19e3def953e1 calsync/cal_sync.go --- a/calsync/cal_sync.go Fri Sep 06 16:43:31 2024 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,36 +0,0 @@ -package main - -import ( - "log" - - "bigasterisk.com/go/gcalendarwatch/convert" - "bigasterisk.com/go/gcalendarwatch/gcalclient" - "bigasterisk.com/go/gcalendarwatch/mongoclient" - M "bigasterisk.com/go/gcalendarwatch/mongoclienttypes" -) - -func updateMongoCalsToMatchGoogleOnce(mc *mongoclient.MongoClient, gc *gcalclient.GCalClient) (err error) { - log.Println("updateMongoCalsToMatchGoogle") - - seen := make(map[string]bool) - - cals, err := gc.AllCalendars() - if err != nil { - return err - } - - for _, serviceCal := range cals { - cal := convert.MongoCalFromGoogleCal(serviceCal) - log.Println(M.Prefix(cal), "syncing calendar doc") - seen[cal.Url] = true - err = mc.UpsertOneCal(cal) - if err != nil { - return err - } - } - - err = mc.DeleteCalsNotInSet(seen) - - log.Println("updateMongoCalsToMatchGoogle done") - return err -} diff -r 465defa3f956 -r 19e3def953e1 calsync/event_sync.go --- a/calsync/event_sync.go Fri Sep 06 16:43:31 2024 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,134 +0,0 @@ -package main - -import ( - "log" - "math/rand" - "time" - - "bigasterisk.com/go/gcalendarwatch/convert" - "bigasterisk.com/go/gcalendarwatch/gcalclient" - "bigasterisk.com/go/gcalendarwatch/mongoclient" - M "bigasterisk.com/go/gcalendarwatch/mongoclienttypes" - "bigasterisk.com/go/gcalendarwatch/notificationrouter" -) - -// Time that we'll still accept the old watchId after we've registered a new one. -const grace = 1 * time.Minute - -// Each calendar syncs like this: -// 1. Full sync of events taking place between `now-initialSyncBack` to `now+initialSyncAhead`. -// 2. Garbage-collect all events last-modified before `now-initialSyncBack` -// 3. Continuous watch of each calendar to catch updates. -func updateMongoEventsToMatchGoogleForever( - mc *mongoclient.MongoClient, - gc *gcalclient.GCalClient, - initialSyncBack time.Duration, - initialSyncAhead time.Duration, - router *notificationrouter.NotificationRouter, -) error { - log.Println("starting updateMongoEventsToMatchGoogleForever") - - now := time.Now() - initialSyncT1 := now.Add(-initialSyncBack) - initialSyncT2 := now.Add(initialSyncAhead) - // Note that we could receive updates outside this interval. - - cals, err := mc.GetAllCals() - if err != nil { - return err - } - log.Println("syncing events from", len(cals), "calendars") - - for _, cal := range cals { - rd := newCalEventsReader(mc, gc, router, cal, initialSyncT1, initialSyncT2) - go rd.watchForUpdates() - } - return nil -} - -type calEventsReader struct { - mc *mongoclient.MongoClient - gc *gcalclient.GCalClient - router *notificationrouter.NotificationRouter - cal M.MongoCal - t1, t2 time.Time - syncToken string -} - -func newCalEventsReader( - mc *mongoclient.MongoClient, - gc *gcalclient.GCalClient, - router *notificationrouter.NotificationRouter, - cal M.MongoCal, - t1, t2 time.Time, -) *calEventsReader { - return &calEventsReader{mc, gc, router, cal, t1, t2, ""} -} - -func (r *calEventsReader) watchForUpdates() { - jitteredStartup := time.Duration(rand.Int()%30) * time.Second - log.Println(M.Prefix(r.cal), "watchForEvents starting in", jitteredStartup) - time.Sleep(jitteredStartup) - - err := r.updateInitialRange() - if err != nil { - log.Fatal(err) - } - - jitter := time.Duration(rand.Intn(4)) * time.Minute - watchLifetime := time.Duration(3)*time.Minute + jitter - - syncFunc := notificationrouter.SyncFunc(func() { - err := r.sync() - if err != nil { - log.Fatal(err) - } - }) - go r.renewWatchEventsForever(watchLifetime, syncFunc) -} - -func (r *calEventsReader) renewWatchEventsForever(lifetime time.Duration, syncFunc notificationrouter.SyncFunc) { - log.Println(M.Prefix(r.cal), "renewWatchEventsForever") - - for { - watchId := notificationrouter.NewWatchId() - jitterScale := rand.Float64()*1 + 0.5 - jitteredLifetime := time.Duration(lifetime.Seconds()*jitterScale) * time.Second - - r.router.AddHandler(watchId, syncFunc, jitteredLifetime+grace) - - err := r.gc.WatchEvents(&r.cal, watchId, jitteredLifetime+grace) - if err != nil { - log.Println("ERROR", M.Prefix(r.cal), "can't sync this cal: ", err) - return - } - time.Sleep(jitteredLifetime) - log.Println(M.Prefix(r.cal), "watch id=", watchId, "is expiring") - } -} - -func (r *calEventsReader) updateInitialRange() error { - events, nextSyncToken, err := r.gc.ListEventsInRange(r.cal, r.t1, r.t2) - r.syncToken = nextSyncToken - if err != nil { - return err - } - for _, ev := range events { - r.mc.UpsertOneEvent(convert.MongoEventFromGoogleEvent(&ev, time.Now() /*todo*/)) - } - - return r.mc.DeleteEventsUpdatedBefore(r.cal, r.t1) -} - -func (r *calEventsReader) sync() error { - events, nextSyncToken, err := r.gc.ListEventUpdates(r.cal, r.syncToken) - if err != nil { - return err - } - r.syncToken = nextSyncToken - for _, ev := range events { - r.mc.UpsertOneEvent(convert.MongoEventFromGoogleEvent(&ev, time.Now() /*todo*/)) - } - - return nil -} diff -r 465defa3f956 -r 19e3def953e1 calsync/sync_cal.go --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/calsync/sync_cal.go Fri Sep 06 16:46:02 2024 -0700 @@ -0,0 +1,36 @@ +package main + +import ( + "log" + + "bigasterisk.com/go/gcalendarwatch/convert" + "bigasterisk.com/go/gcalendarwatch/gcalclient" + "bigasterisk.com/go/gcalendarwatch/mongoclient" + M "bigasterisk.com/go/gcalendarwatch/mongoclienttypes" +) + +func updateMongoCalsToMatchGoogleOnce(mc *mongoclient.MongoClient, gc *gcalclient.GCalClient) (err error) { + log.Println("updateMongoCalsToMatchGoogle") + + seen := make(map[string]bool) + + cals, err := gc.AllCalendars() + if err != nil { + return err + } + + for _, serviceCal := range cals { + cal := convert.MongoCalFromGoogleCal(serviceCal) + log.Println(M.Prefix(cal), "syncing calendar doc") + seen[cal.Url] = true + err = mc.UpsertOneCal(cal) + if err != nil { + return err + } + } + + err = mc.DeleteCalsNotInSet(seen) + + log.Println("updateMongoCalsToMatchGoogle done") + return err +} diff -r 465defa3f956 -r 19e3def953e1 calsync/sync_event.go --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/calsync/sync_event.go Fri Sep 06 16:46:02 2024 -0700 @@ -0,0 +1,134 @@ +package main + +import ( + "log" + "math/rand" + "time" + + "bigasterisk.com/go/gcalendarwatch/convert" + "bigasterisk.com/go/gcalendarwatch/gcalclient" + "bigasterisk.com/go/gcalendarwatch/mongoclient" + M "bigasterisk.com/go/gcalendarwatch/mongoclienttypes" + "bigasterisk.com/go/gcalendarwatch/notificationrouter" +) + +// Time that we'll still accept the old watchId after we've registered a new one. +const grace = 1 * time.Minute + +// Each calendar syncs like this: +// 1. Full sync of events taking place between `now-initialSyncBack` to `now+initialSyncAhead`. +// 2. Garbage-collect all events last-modified before `now-initialSyncBack` +// 3. Continuous watch of each calendar to catch updates. +func updateMongoEventsToMatchGoogleForever( + mc *mongoclient.MongoClient, + gc *gcalclient.GCalClient, + initialSyncBack time.Duration, + initialSyncAhead time.Duration, + router *notificationrouter.NotificationRouter, +) error { + log.Println("starting updateMongoEventsToMatchGoogleForever") + + now := time.Now() + initialSyncT1 := now.Add(-initialSyncBack) + initialSyncT2 := now.Add(initialSyncAhead) + // Note that we could receive updates outside this interval. + + cals, err := mc.GetAllCals() + if err != nil { + return err + } + log.Println("syncing events from", len(cals), "calendars") + + for _, cal := range cals { + rd := newCalEventsReader(mc, gc, router, cal, initialSyncT1, initialSyncT2) + go rd.watchForUpdates() + } + return nil +} + +type calEventsReader struct { + mc *mongoclient.MongoClient + gc *gcalclient.GCalClient + router *notificationrouter.NotificationRouter + cal M.MongoCal + t1, t2 time.Time + syncToken string +} + +func newCalEventsReader( + mc *mongoclient.MongoClient, + gc *gcalclient.GCalClient, + router *notificationrouter.NotificationRouter, + cal M.MongoCal, + t1, t2 time.Time, +) *calEventsReader { + return &calEventsReader{mc, gc, router, cal, t1, t2, ""} +} + +func (r *calEventsReader) watchForUpdates() { + jitteredStartup := time.Duration(rand.Int()%30) * time.Second + log.Println(M.Prefix(r.cal), "watchForEvents starting in", jitteredStartup) + time.Sleep(jitteredStartup) + + err := r.updateInitialRange() + if err != nil { + log.Fatal(err) + } + + jitter := time.Duration(rand.Intn(4)) * time.Minute + watchLifetime := time.Duration(3)*time.Minute + jitter + + syncFunc := notificationrouter.SyncFunc(func() { + err := r.sync() + if err != nil { + log.Fatal(err) + } + }) + go r.renewWatchEventsForever(watchLifetime, syncFunc) +} + +func (r *calEventsReader) renewWatchEventsForever(lifetime time.Duration, syncFunc notificationrouter.SyncFunc) { + log.Println(M.Prefix(r.cal), "renewWatchEventsForever") + + for { + watchId := notificationrouter.NewWatchId() + jitterScale := rand.Float64()*1 + 0.5 + jitteredLifetime := time.Duration(lifetime.Seconds()*jitterScale) * time.Second + + r.router.AddHandler(watchId, syncFunc, jitteredLifetime+grace) + + err := r.gc.WatchEvents(&r.cal, watchId, jitteredLifetime+grace) + if err != nil { + log.Println("ERROR", M.Prefix(r.cal), "can't sync this cal: ", err) + return + } + time.Sleep(jitteredLifetime) + log.Println(M.Prefix(r.cal), "watch id=", watchId, "is expiring") + } +} + +func (r *calEventsReader) updateInitialRange() error { + events, nextSyncToken, err := r.gc.ListEventsInRange(r.cal, r.t1, r.t2) + r.syncToken = nextSyncToken + if err != nil { + return err + } + for _, ev := range events { + r.mc.UpsertOneEvent(convert.MongoEventFromGoogleEvent(&ev, time.Now() /*todo*/)) + } + + return r.mc.DeleteEventsUpdatedBefore(r.cal, r.t1) +} + +func (r *calEventsReader) sync() error { + events, nextSyncToken, err := r.gc.ListEventUpdates(r.cal, r.syncToken) + if err != nil { + return err + } + r.syncToken = nextSyncToken + for _, ev := range events { + r.mc.UpsertOneEvent(convert.MongoEventFromGoogleEvent(&ev, time.Now() /*todo*/)) + } + + return nil +}