view calsync/sync_event.go @ 81:79320eff10f2

support status=="cancelled" to delete events
author drewp@bigasterisk.com
date Fri, 06 Sep 2024 18:26:13 -0700
parents e8164bd2f9a1
children
line wrap: on
line source

package main

import (
	"fmt"
	"log"
	"math/rand"
	"time"

	"bigasterisk.com/go/gcalendarwatch/convert"
	"bigasterisk.com/go/gcalendarwatch/gcalclient"
	"bigasterisk.com/go/gcalendarwatch/mongoclient"
	M "bigasterisk.com/go/gcalendarwatch/mongoclienttypes"
	"bigasterisk.com/go/gcalendarwatch/notificationrouter"
)

// Time that we'll still accept the old watchId after we've registered a new one.
const grace = 1 * time.Minute

// How often to renew watches (roughly).
const watchLifetime = time.Duration(120) * time.Minute

// Each calendar syncs like this:
//  1. Full sync of events taking place between `now-initialSyncBack` to `now+initialSyncAhead`.
//  2. Garbage-collect all events last-modified before `now-initialSyncBack`
//  3. Continuous watch of each calendar to catch updates.
func updateMongoEventsToMatchGoogleForever(
	mc *mongoclient.MongoClient,
	gc *gcalclient.GCalClient,
	initialSyncBack time.Duration,
	initialSyncAhead time.Duration,
	router *notificationrouter.NotificationRouter,
	startupJitter time.Duration,
) error {
	log.Println("starting updateMongoEventsToMatchGoogleForever")

	now := time.Now()
	initialSyncT1 := now.Add(-initialSyncBack)
	initialSyncT2 := now.Add(initialSyncAhead)
	// Note that we could receive updates outside this interval.

	cals, err := mc.GetAllCals()
	if err != nil {
		return err
	}
	log.Println("syncing events from", len(cals), "calendars")

	for _, cal := range cals {
		rd := newCalEventsReader(mc, gc, router, cal, initialSyncT1, initialSyncT2)
		go func() {
			err := rd.watchForUpdates(startupJitter)
			if err != nil {
				log.Println("ERROR", M.Prefix(rd.cal), "watchForUpdates: ", err, "(aborting this calendar)")
			}
		}()
	}
	return nil
}

type calEventsReader struct {
	mc        *mongoclient.MongoClient
	gc        *gcalclient.GCalClient
	router    *notificationrouter.NotificationRouter
	cal       M.MongoCal
	t1, t2    time.Time
	syncToken string
}

func newCalEventsReader(
	mc *mongoclient.MongoClient,
	gc *gcalclient.GCalClient,
	router *notificationrouter.NotificationRouter,
	cal M.MongoCal,
	t1, t2 time.Time,
) *calEventsReader {
	return &calEventsReader{mc, gc, router, cal, t1, t2, ""}
}

func (r *calEventsReader) watchForUpdates(startupJitter time.Duration) error {
	r.watchForUpdatesJitteredStartup(startupJitter)

	err := r.updateInitialRange()
	if err != nil {
		return err
	}

	syncFunc := notificationrouter.SyncFunc(func() {
		err := r.sync()
		if err != nil {
			log.Fatalln("ERROR", M.Prefix(r.cal), "sync: ", err)
		}
	})
	go r.renewWatchEventsForever(watchLifetime, syncFunc)
	return nil
}

func (r *calEventsReader) watchForUpdatesJitteredStartup(startupJitter time.Duration) {
	jitteredStartup := time.Duration(startupJitter.Seconds()*rand.Float64()) * time.Second
	log.Println(M.Prefix(r.cal), "watchForEvents starting in", jitteredStartup)
	time.Sleep(jitteredStartup)
}

func (r *calEventsReader) renewWatchEventsForever(lifetime time.Duration, syncFunc notificationrouter.SyncFunc) {
	log.Println(M.Prefix(r.cal), "renewWatchEventsForever")

	for {
		watchId := notificationrouter.NewWatchId()
		jitterScale := 0.5 + 1.0*rand.Float64()
		jitteredLifetime := time.Duration(lifetime.Seconds()*jitterScale) * time.Second

		r.router.AddHandler(watchId, syncFunc, jitteredLifetime+grace)

		err := r.gc.WatchEvents(&r.cal, watchId, jitteredLifetime+grace)
		if err != nil {
			log.Println("ERROR", M.Prefix(r.cal), "can't sync this cal: ", err)
			return
		}
		time.Sleep(jitteredLifetime)
		log.Println(M.Prefix(r.cal), "watch id=", watchId, "is expiring")
	}
}

func (r *calEventsReader) updateInitialRange() error {
	events, nextSyncToken, err := r.gc.ListEventsInRange(r.cal, r.t1, r.t2)
	if err != nil {
		return err
	}
	r.syncToken = nextSyncToken
	err = r.upsertAndDeleteEvents(events)
	if err != nil {
		return err
	}

	return r.mc.DeleteEventsUpdatedBefore(r.cal, r.t1)
}

func (r *calEventsReader) sync() error {
	events, nextSyncToken, err := r.gc.ListEventUpdates(r.cal, r.syncToken)
	if err != nil {
		return fmt.Errorf("ListEventUpdates: %v", err)
	}
	r.syncToken = nextSyncToken
	err = r.upsertAndDeleteEvents(events)
	if err != nil {
		return fmt.Errorf("upsertAndDeleteEvents: %v", err)
	}

	return nil
}

func (r *calEventsReader) upsertAndDeleteEvents(events []gcalclient.CalendarEvent) error {
	for _, ev := range events {
		if ev.Status == "cancelled" {
			err := r.mc.DeleteEvent(ev.EventUrl)
			if err != nil {
				return err
			}
			continue
		}
		err := r.mc.UpsertOneEvent(convert.MongoEventFromGoogleEvent(&ev, time.Now() /*todo*/))
		if err != nil {
			return err
		}
	}
	return nil
}