diff calsync/event_sync.go @ 61:8aee4f5c4bdd

receive notifications and route them to calendar sync functions
author drewp@bigasterisk.com
date Fri, 06 Sep 2024 16:43:14 -0700
parents 3b0595c2bf03
children
line wrap: on
line diff
--- a/calsync/event_sync.go	Fri Sep 06 16:41:48 2024 -0700
+++ b/calsync/event_sync.go	Fri Sep 06 16:43:14 2024 -0700
@@ -2,28 +2,35 @@
 
 import (
 	"log"
+	"math/rand"
 	"time"
 
 	"bigasterisk.com/go/gcalendarwatch/convert"
 	"bigasterisk.com/go/gcalendarwatch/gcalclient"
 	"bigasterisk.com/go/gcalendarwatch/mongoclient"
 	M "bigasterisk.com/go/gcalendarwatch/mongoclienttypes"
+	"bigasterisk.com/go/gcalendarwatch/notificationrouter"
 )
 
+// Time that we'll still accept the old watchId after we've registered a new one.
+const grace = 1 * time.Minute
+
 // Each calendar syncs like this:
-//  1. Full sync of events taking place between `now-keepHistory` to `now+syncAhead`.
-//  2. Garbage-collect all events last-modified before `now-keepHistory`
+//  1. Full sync of events taking place between `now-initialSyncBack` to `now+initialSyncAhead`.
+//  2. Garbage-collect all events last-modified before `now-initialSyncBack`
 //  3. Continuous watch of each calendar to catch updates.
 func updateMongoEventsToMatchGoogleForever(
 	mc *mongoclient.MongoClient,
 	gc *gcalclient.GCalClient,
-	keepHistory time.Duration,
-	syncAhead time.Duration) error {
+	initialSyncBack time.Duration,
+	initialSyncAhead time.Duration,
+	router *notificationrouter.NotificationRouter,
+) error {
 	log.Println("starting updateMongoEventsToMatchGoogleForever")
 
 	now := time.Now()
-	syncStart := now.Add(-keepHistory)
-	syncEnd := now.Add(syncAhead)
+	initialSyncT1 := now.Add(-initialSyncBack)
+	initialSyncT2 := now.Add(initialSyncAhead)
 	// Note that we could receive updates outside this interval.
 
 	cals, err := mc.GetAllCals()
@@ -33,8 +40,8 @@
 	log.Println("syncing events from", len(cals), "calendars")
 
 	for _, cal := range cals {
-		rd := newCalEventsReader(mc, gc, cal, syncStart, syncEnd)
-		go rd.updateForever()
+		rd := newCalEventsReader(mc, gc, router, cal, initialSyncT1, initialSyncT2)
+		go rd.watchForUpdates()
 	}
 	return nil
 }
@@ -42,29 +49,61 @@
 type calEventsReader struct {
 	mc        *mongoclient.MongoClient
 	gc        *gcalclient.GCalClient
+	router    *notificationrouter.NotificationRouter
 	cal       M.MongoCal
 	t1, t2    time.Time
 	syncToken string
 }
 
-func newCalEventsReader(mc *mongoclient.MongoClient, gc *gcalclient.GCalClient, cal M.MongoCal, t1, t2 time.Time) *calEventsReader {
-	return &calEventsReader{mc, gc, cal, t1, t2, ""}
+func newCalEventsReader(
+	mc *mongoclient.MongoClient,
+	gc *gcalclient.GCalClient,
+	router *notificationrouter.NotificationRouter,
+	cal M.MongoCal,
+	t1, t2 time.Time,
+) *calEventsReader {
+	return &calEventsReader{mc, gc, router, cal, t1, t2, ""}
 }
 
-func (r *calEventsReader) updateForever() {
+func (r *calEventsReader) watchForUpdates() {
+	jitteredStartup := time.Duration(rand.Int()%30) * time.Second
+	log.Println(M.Prefix(r.cal), "watchForEvents starting in", jitteredStartup)
+	time.Sleep(jitteredStartup)
+
 	err := r.updateInitialRange()
 	if err != nil {
 		log.Fatal(err)
 	}
 
-	for {
-		log.Println("syncing", r.cal.Summary, "with sync", r.syncToken)
-		err = r.sync()
+	jitter := time.Duration(rand.Intn(4)) * time.Minute
+	watchLifetime := time.Duration(3)*time.Minute + jitter
+
+	syncFunc := notificationrouter.SyncFunc(func() {
+		err := r.sync()
 		if err != nil {
 			log.Fatal(err)
 		}
+	})
+	go r.renewWatchEventsForever(watchLifetime, syncFunc)
+}
 
-		time.Sleep(10 * time.Second)
+func (r *calEventsReader) renewWatchEventsForever(lifetime time.Duration, syncFunc notificationrouter.SyncFunc) {
+	log.Println(M.Prefix(r.cal), "renewWatchEventsForever")
+
+	for {
+		watchId := notificationrouter.NewWatchId()
+		jitterScale := rand.Float64()*1 + 0.5
+		jitteredLifetime := time.Duration(lifetime.Seconds()*jitterScale) * time.Second
+
+		r.router.AddHandler(watchId, syncFunc, jitteredLifetime+grace)
+
+		err := r.gc.WatchEvents(&r.cal, watchId, jitteredLifetime+grace)
+		if err != nil {
+			log.Println("ERROR", M.Prefix(r.cal), "can't sync this cal: ", err)
+			return
+		}
+		time.Sleep(jitteredLifetime)
+		log.Println(M.Prefix(r.cal), "watch id=", watchId, "is expiring")
 	}
 }