view calsync/sync_event.go @ 69:72021e6e5c54

logging
author drewp@bigasterisk.com
date Fri, 06 Sep 2024 17:30:16 -0700
parents 19e3def953e1
children 5354739c9b0a
line wrap: on
line source

package main

import (
	"log"
	"math/rand"
	"time"

	"bigasterisk.com/go/gcalendarwatch/convert"
	"bigasterisk.com/go/gcalendarwatch/gcalclient"
	"bigasterisk.com/go/gcalendarwatch/mongoclient"
	M "bigasterisk.com/go/gcalendarwatch/mongoclienttypes"
	"bigasterisk.com/go/gcalendarwatch/notificationrouter"
)

// Time that we'll still accept the old watchId after we've registered a new one.
const grace = 1 * time.Minute

// Each calendar syncs like this:
//  1. Full sync of events taking place between `now-initialSyncBack` to `now+initialSyncAhead`.
//  2. Garbage-collect all events last-modified before `now-initialSyncBack`
//  3. Continuous watch of each calendar to catch updates.
func updateMongoEventsToMatchGoogleForever(
	mc *mongoclient.MongoClient,
	gc *gcalclient.GCalClient,
	initialSyncBack time.Duration,
	initialSyncAhead time.Duration,
	router *notificationrouter.NotificationRouter,
) error {
	log.Println("starting updateMongoEventsToMatchGoogleForever")

	now := time.Now()
	initialSyncT1 := now.Add(-initialSyncBack)
	initialSyncT2 := now.Add(initialSyncAhead)
	// Note that we could receive updates outside this interval.

	cals, err := mc.GetAllCals()
	if err != nil {
		return err
	}
	log.Println("syncing events from", len(cals), "calendars")

	for _, cal := range cals {
		rd := newCalEventsReader(mc, gc, router, cal, initialSyncT1, initialSyncT2)
		go rd.watchForUpdates()
	}
	return nil
}

type calEventsReader struct {
	mc        *mongoclient.MongoClient
	gc        *gcalclient.GCalClient
	router    *notificationrouter.NotificationRouter
	cal       M.MongoCal
	t1, t2    time.Time
	syncToken string
}

func newCalEventsReader(
	mc *mongoclient.MongoClient,
	gc *gcalclient.GCalClient,
	router *notificationrouter.NotificationRouter,
	cal M.MongoCal,
	t1, t2 time.Time,
) *calEventsReader {
	return &calEventsReader{mc, gc, router, cal, t1, t2, ""}
}

func (r *calEventsReader) watchForUpdates() {
	jitteredStartup := time.Duration(rand.Int()%30) * time.Second
	log.Println(M.Prefix(r.cal), "watchForEvents starting in", jitteredStartup)
	time.Sleep(jitteredStartup)

	err := r.updateInitialRange()
	if err != nil {
		log.Fatal(err)
	}

	jitter := time.Duration(rand.Intn(4)) * time.Minute
	watchLifetime := time.Duration(3)*time.Minute + jitter

	syncFunc := notificationrouter.SyncFunc(func() {
		err := r.sync()
		if err != nil {
			log.Fatal(err)
		}
	})
	go r.renewWatchEventsForever(watchLifetime, syncFunc)
}

func (r *calEventsReader) renewWatchEventsForever(lifetime time.Duration, syncFunc notificationrouter.SyncFunc) {
	log.Println(M.Prefix(r.cal), "renewWatchEventsForever")

	for {
		watchId := notificationrouter.NewWatchId()
		jitterScale := rand.Float64()*1 + 0.5
		jitteredLifetime := time.Duration(lifetime.Seconds()*jitterScale) * time.Second

		r.router.AddHandler(watchId, syncFunc, jitteredLifetime+grace)

		err := r.gc.WatchEvents(&r.cal, watchId, jitteredLifetime+grace)
		if err != nil {
			log.Println("ERROR", M.Prefix(r.cal), "can't sync this cal: ", err)
			return
		}
		time.Sleep(jitteredLifetime)
		log.Println(M.Prefix(r.cal), "watch id=", watchId, "is expiring")
	}
}

func (r *calEventsReader) updateInitialRange() error {
	events, nextSyncToken, err := r.gc.ListEventsInRange(r.cal, r.t1, r.t2)
	r.syncToken = nextSyncToken
	if err != nil {
		return err
	}
	for _, ev := range events {
		r.mc.UpsertOneEvent(convert.MongoEventFromGoogleEvent(&ev, time.Now() /*todo*/))
	}

	return r.mc.DeleteEventsUpdatedBefore(r.cal, r.t1)
}

func (r *calEventsReader) sync() error {
	events, nextSyncToken, err := r.gc.ListEventUpdates(r.cal, r.syncToken)
	if err != nil {
		return err
	}
	r.syncToken = nextSyncToken
	for _, ev := range events {
		r.mc.UpsertOneEvent(convert.MongoEventFromGoogleEvent(&ev, time.Now() /*todo*/))
	}

	return nil
}