changeset 56:635ff76f867c

WIP: rewrite: process load+sync in parallel between cals; simplify a lot
author drewp@bigasterisk.com
date Thu, 05 Sep 2024 13:50:40 -0700
parents 627c815f83bb
children 24f662799710
files calsync/cal_sync.go calsync/convert/convert.go calsync/event_sync.go calsync/gcalclient/event_requests.go calsync/gcalclient/gcalclient.go calsync/main.go calsync/mongoclient/events_collection.go
diffstat 7 files changed, 212 insertions(+), 154 deletions(-) [+]
line wrap: on
line diff
--- a/calsync/cal_sync.go	Tue Sep 03 14:50:20 2024 -0700
+++ b/calsync/cal_sync.go	Thu Sep 05 13:50:40 2024 -0700
@@ -8,12 +8,12 @@
 	"bigasterisk.com/go/gcalendarwatch/mongoclient"
 )
 
-func updateMongoCalsToMatchGoogle(mc *mongoclient.MongoClient, gc *gcalclient.GCalClient) (err error) {
+func updateMongoCalsToMatchGoogleOnce(mc *mongoclient.MongoClient, gc *gcalclient.GCalClient) (err error) {
 	log.Println("updateMongoCalsToMatchGoogle")
 
 	seen := make(map[string]bool)
 
-	cals, err := gc.AllCalendars(100)
+	cals, err := gc.AllCalendars()
 	if err != nil {
 		return err
 	}
@@ -22,8 +22,15 @@
 		calUrl := gcalclient.MakeCalUrl(cal.Id)
 		log.Println("syncing", calUrl)
 		seen[calUrl] = true
-		mc.UpsertOneCal(convert.MongoCalFromGoogleCal(cal))
+		err = mc.UpsertOneCal(convert.MongoCalFromGoogleCal(cal))
+		if err != nil {
+			return err
+		}
+		log.Println("syncing", calUrl, "done")
 	}
 
-	return mc.DeleteCalsNotInSet(seen)
+	err = mc.DeleteCalsNotInSet(seen)
+
+	log.Println("updateMongoCalsToMatchGoogle done")
+	return err
 }
--- a/calsync/convert/convert.go	Tue Sep 03 14:50:20 2024 -0700
+++ b/calsync/convert/convert.go	Thu Sep 05 13:50:40 2024 -0700
@@ -19,7 +19,6 @@
 }
 
 func MongoEventFromGoogleEvent2(
-	calUrl string,
 	ev *gcalclient.CalendarEvent,
 	now time.Time,
 ) mongoclient.MongoEvent {
--- a/calsync/event_sync.go	Tue Sep 03 14:50:20 2024 -0700
+++ b/calsync/event_sync.go	Thu Sep 05 13:50:40 2024 -0700
@@ -9,46 +9,67 @@
 	"bigasterisk.com/go/gcalendarwatch/mongoclient"
 )
 
-// Runs forever. Applies to all calendars.
-func updateMongoEventsToMatchGoogle(
-	mc *mongoclient.MongoClient, gc *gcalclient.GCalClient) error {
-	t := time.Now()
-	eventUpdates := make(chan *gcalclient.FindEventsMessage)
+// Each calendar syncs like this:
+//  1. Full sync of events taking place between `now-keepHistory` to `now+syncAhead`.
+//  2. Garbage-collect all events last-modified before `now-keepHistory`
+//  3. Continuous watch of each calendar to catch updates.
+func updateMongoEventsToMatchGoogleForever(
+	mc *mongoclient.MongoClient,
+	gc *gcalclient.GCalClient,
+	keepHistory time.Duration,
+	syncAhead time.Duration) error {
+
+	now := time.Now()
+	syncStart := now.Add(-keepHistory)
+	syncEnd := now.Add(syncAhead)
+	// Note that we could receive updates outside this interval.
 
-	updateRoutine(eventUpdates, gc, mc)
+	cals, err := mc.GetAllCals()
+	if err != nil {
+		return err
+	}
+	log.Println("syncing events from", len(cals), "calendars")
+
+	readerErr := make(chan error)
+	for _, cal := range cals {
+		rd := newCalEventsReader(mc, gc, cal, syncStart, syncEnd)
+		go func() {
+			readerErr <- rd.updateForever()
+		}()
+	}
+	return <-readerErr
+}
 
-	for ev := range eventUpdates {
-		log.Println("up for ev", ev.CalId)
-		if ev.Event != nil {
-			mc.UpsertOneEvent(
-				convert.MongoEventFromGoogleEvent2(
-					ev.Event.CalendarUrl,
-					ev.Event,
-					/*modSince=*/ t,
-				),
-			)
-		} else {
-			log.Println("cal", ev.CalId, "ready for cleanup")
-			log.Println("t=", t)
-			mc.DeleteEventsUpdatedBefore(t)
-		}
+type calEventsReader struct {
+	mc  *mongoclient.MongoClient
+	gc  *gcalclient.GCalClient
+	cal mongoclient.MongoCal
+	t1  time.Time
+	t2  time.Time
+}
+
+func newCalEventsReader(mc *mongoclient.MongoClient, gc *gcalclient.GCalClient, cal mongoclient.MongoCal, t1 time.Time, t2 time.Time) *calEventsReader {
+	return &calEventsReader{mc, gc, cal, t1, t2}
+}
+
+func (r *calEventsReader) updateForever() error {
+
+	var syncToken string
+	items, err := r.gc.ListEventsInRange(r.cal, r.t1, r.t2)
+	if err != nil {
+		return err
 	}
-	log.Fatalln("updateMongoEventsToMatchGoogle done??")
+	for _, item := range items {
+		r.mc.UpsertOneEvent(convert.MongoEventFromGoogleEvent2(&item.Event, time.Now() /*todo*/))
+		syncToken = item.SyncToken
+	}
+
+	r.mc.DeleteEventsUpdatedBefore(r.cal, r.t1)
+
+	for _, item := range r.gc.ListEventUpdatesForever(syncToken) {
+		// r.mc.UpsertOneEvent(convert.MongoEventFromGoogleEvent2(item.ev, time.Now()/*todo*/))
+		syncToken = item.SyncToken
+		_ = syncToken
+	}
 	return nil
 }
-
-func updateRoutine(
-	eventUpdates chan *gcalclient.FindEventsMessage,
-	gc *gcalclient.GCalClient,
-	mc *mongoclient.MongoClient,
-) {
-	defer close(eventUpdates)
-
-	t := time.Now()
-	err := gc.FindEvents(mc, t.AddDate(0, 0, -3), t.AddDate(0, 0, 7), eventUpdates)
-	if err != nil {
-		log.Println(err)
-		return
-	}
-	log.Println("updateRoutine done")
-}
--- a/calsync/gcalclient/event_requests.go	Tue Sep 03 14:50:20 2024 -0700
+++ b/calsync/gcalclient/event_requests.go	Thu Sep 05 13:50:40 2024 -0700
@@ -6,7 +6,7 @@
 	"google.golang.org/api/calendar/v3"
 )
 
-const pageSize = 4
+const pageSize = 20
 
 func rangedEventsCall(srv *calendar.Service, calGoogleId string,
 	initialFillStart, initialFillEnd time.Time, pageToken string) *calendar.EventsListCall {
--- a/calsync/gcalclient/gcalclient.go	Tue Sep 03 14:50:20 2024 -0700
+++ b/calsync/gcalclient/gcalclient.go	Thu Sep 05 13:50:40 2024 -0700
@@ -1,7 +1,14 @@
 package gcalclient
 
+/*
+Note: this module keeps gcal *paging* private (we fetch all the pages), but
+*sync* is public. At least, callers may receive a syncToken and have to pass it
+back in.
+*/
 import (
 	"context"
+	"crypto/md5"
+	"fmt"
 	"log"
 	"net/url"
 	"strings"
@@ -45,9 +52,9 @@
 	// todo: disconnect watches if possible
 }
 
-func (gc *GCalClient) AllCalendars(maxResults int64) ([]*calendar.CalendarListEntry, error) {
+func (gc *GCalClient) AllCalendars() ([]*calendar.CalendarListEntry, error) {
 	// todo: pagination
-	list, err := gc.srv.CalendarList.List().MaxResults(maxResults).Do()
+	list, err := gc.srv.CalendarList.List().MaxResults( /*maxResults*/ 100).Do()
 	if err != nil {
 		return nil, err
 	}
@@ -69,123 +76,136 @@
 	list.Items = ret
 }
 
-type FindEventsMessage struct {
-	// either non-nil this:
-	Event *CalendarEvent
-	// or these:
-	CalId                    string
-	OlderThanThisIsDeletable time.Time
+// // FindEvents considers all calendars. It spawns routines, does some initial sync, and returns.
+// func (gc *GCalClient) FindEvents(
+// 	mc *mongoclient.MongoClient,
+// 	// For each calendar, after events in this time range have been sent to
+// 	// `out`, the chan will get the other kind of FindEventsMessage (CalId,
+// 	// ...). That message signals that the caller may cull old events on the given
+// 	// calendar. After that point, all events will be updates (including
+// 	// deletes).
+// 	initialFillStart, initialFillEnd time.Time,
+// 	out chan *FindEventsMessage,
+// ) error {
+// 	cals, err := mc.GetAllCals()
+// 	if err != nil {
+// 		return err
+// 	}
+// 	log.Println("reading", len(cals), "calendars")
+// 	for calNum, cal := range cals {
+// 		go gc.calRoutine(calNum, cal, initialFillStart, initialFillEnd, out)
+// 	}
+
+// 	return nil
+// }
+
+// func (gc *GCalClient) calRoutine(calNum int, cal mongoclient.MongoCal, initialFillStart time.Time, initialFillEnd time.Time, out chan *FindEventsMessage) bool {
+// 	t := time.Now()
+// 	log.Println("  cal", calNum, cal.Url)
+// 	log.Println("  cal", calNum, "readEventsInRange", "from", initialFillStart, "to", initialFillEnd)
+// 	syncToken, err := gc.readEventsInRange(&cal, initialFillStart, initialFillEnd, out)
+// 	if err != nil {
+// 		log.Panicln(err)
+// 		return true
+// 	}
+
+// 	out <- &FindEventsMessage{nil, cal.GoogleId, t}
+
+// 	ew := gc.NewEventWatch(&cal, t, syncToken)
+
+// 	for loop := 0; ; loop++ {
+// 		log.Println("")
+// 		log.Println("tail loop", loop, "for", cal.Url)
+// 		err := ew.GetMoreEvents(out)
+// 		if err != nil {
+// 			log.Panicln(err)
+// 			return true
+// 		}
+// 		time.Sleep(10 * time.Minute)
+// 	}
+
+// }
+
+func shortDebugHash(pageToken string) string {
+	if pageToken == "" {
+		return "(empty)"
+	}
+	return fmt.Sprintf("%x", md5.Sum([]byte(pageToken)))
 }
 
-// FindEvents considers all calendars. It spawns routines, does some initial sync, and returns.
-func (gc *GCalClient) FindEvents(
-	mc *mongoclient.MongoClient,
-	// For each calendar, after events in this time range have been sent to
-	// `out`, the chan will get the other kind of FindEventsMessage (CalId,
-	// ...). That message signals that the caller may cull old events on the given
-	// calendar. After that point, all events will be updates (including
-	// deletes).
-	initialFillStart, initialFillEnd time.Time,
-	out chan *FindEventsMessage,
-) error {
-	cals, err := mc.GetAllCals()
-	if err != nil {
-		return err
-	}
-	log.Println("reading", len(cals), "calendars")
-	for calNum, cal := range cals {
-		go gc.calRoutine(calNum, cal, initialFillStart, initialFillEnd, out)
-	}
-
-	return nil
+type EventResult struct {
+	Event     CalendarEvent
+	SyncToken string
 }
 
-func (gc *GCalClient) calRoutine(calNum int, cal mongoclient.MongoCal, initialFillStart time.Time, initialFillEnd time.Time, out chan *FindEventsMessage) bool {
-	t := time.Now()
-	log.Println("  cal", calNum, cal.Url)
-	log.Println("  cal", calNum, "readEventsInRange", "from", initialFillStart, "to", initialFillEnd)
-	syncToken, err := gc.readEventsInRange(&cal, initialFillStart, initialFillEnd, out)
+func (gc *GCalClient) ListEventsInRange(cal mongoclient.MongoCal, t1, t2 time.Time) (
+	[]EventResult, error) {
+	ret := make([]EventResult, 0)
+	mongoclient.LogWithCal(cal, "ListEventsInRange", t1, "to", t2)
+
+	syncToken := ""
+	var err error
+	var events []CalendarEvent
+
+	events, syncToken, err = readEventsPages(gc, cal, t1, t2)
 	if err != nil {
 		log.Panicln(err)
-		return true
+		return ret, err
+	}
+	for _, e := range events {
+		ret = append(ret, EventResult{e, syncToken})
 	}
 
-	out <- &FindEventsMessage{nil, cal.GoogleId, t}
-
-	ew := gc.NewEventWatch(&cal, t, syncToken)
+	return ret, nil
+}
 
-	for loop := 0; ; loop++ {
-		log.Println("")
-		log.Println("tail loop", loop, "for", cal.Url)
-		err := ew.GetMoreEvents(out)
-		if err != nil {
-			log.Panicln(err)
-			return true
-		}
-		time.Sleep(10 * time.Minute)
+func (gc *GCalClient) ListEventUpdatesForever(syncToken string) []EventResult {
+	for {
+		time.Sleep(5 * time.Minute)
 	}
 
 }
 
-// Synchronous.
-func (gc *GCalClient) readEventsInRange(cal *mongoclient.MongoCal, t1, t2 time.Time, out chan *FindEventsMessage) (string, error) {
-	log.Println("    get initial events for", cal.Url, "between", t1, "and", t2)
+func readEventsPages(gc *GCalClient, cal mongoclient.MongoCal, t1, t2 time.Time) (
+	events []CalendarEvent, syncToken string, err error) {
+
+	events = make([]CalendarEvent, 0)
+	syncToken = ""
+	err = nil
 
 	pageToken := ""
-	syncToken := ""
-	var err error
-	var done bool
 	for {
-		done, syncToken, pageToken, err = readEventsPage(gc, cal, t1, t2, pageToken, out)
-		if err != nil {
-			log.Panicln(err)
-			return "", err
+		mongoclient.LogWithCal(cal, "getting another page", shortDebugHash(pageToken))
+		pageResult, err2 := rangedEventsCall(gc.srv, cal.GoogleId, t1, t2, pageToken).Do()
+		if err2 != nil {
+			log.Fatal(err2)
+			return nil, "", err2
 		}
-		if done {
+
+		mongoclient.LogWithCal(cal, "got page with", len(pageResult.Items), "events")
+		if len(pageResult.Items) == 0 {
 			break
 		}
-	}
-	return syncToken, nil
-}
-
-func readEventsPage(gc *GCalClient, cal *mongoclient.MongoCal, t1, t2 time.Time, pageToken string, out chan *FindEventsMessage) (done bool, syncToken, nextPageToken string, err error) {
-	log.Println("      getting another page", pageToken)
-	events, err := rangedEventsCall(gc.srv, cal.GoogleId, t1, t2, pageToken).Do()
-	if err != nil {
-		return
-	}
-
-	log.Println("        got", len(events.Items), "events, sync=", events.NextSyncToken)
-	if len(events.Items) == 0 {
-		done = true
-		return
-	}
-
-	sendEvents(events, cal, out)
 
-	syncToken = events.NextSyncToken
-	if events.NextPageToken == "" {
-		done = true
-		return
-	}
-	nextPageToken = events.NextPageToken
-	syncToken = events.NextSyncToken
-	return
-}
+		for _, ev := range pageResult.Items {
+			if ev.Status == "cancelled" {
+				log.Fatal("todo")
+			}
+			events = append(events, CalendarEvent{
+				Event:       ev,
+				CalendarUrl: cal.Url,
+				EventUrl:    MakeEventUrl(cal.Url, ev.Id),
+			})
+		}
 
-// Send a page of calendar.Events over a channel, as CalendarEvent structs.
-func sendEvents(events *calendar.Events, cal *mongoclient.MongoCal, out chan *FindEventsMessage) {
-	for _, event := range events.Items {
-		if event.Status == "cancelled" {
-			log.Fatal("todo")
+		syncToken = pageResult.NextSyncToken
+		if pageResult.NextPageToken == "" {
+			break
 		}
-		out <- &FindEventsMessage{
-			Event: &CalendarEvent{
-				Event:       event,
-				CalendarUrl: cal.Url,
-				EventUrl:    MakeEventUrl(cal.Url, event.Id),
-			}}
+		pageToken = pageResult.NextPageToken
 	}
+	mongoclient.LogWithCal(cal, "total events read: ", len(events), "with syncToken", syncToken)
+	return events, syncToken, nil
 }
 
 type eventWatch struct {
@@ -206,7 +226,7 @@
 }
 
 // Call this when there are likely new changes to sync.
-func (w *eventWatch) GetMoreEvents(out chan *FindEventsMessage) error {
+func (w *eventWatch) GetMoreEvents() error {
 	call := syncEventsCall(w.gc.srv, w.cal.GoogleId)
 
 	log.Println("listing events on", w.cal.GoogleId, "with")
@@ -227,7 +247,7 @@
 	w.nextSyncToken = events.NextSyncToken
 	w.nextPageToken = events.NextPageToken
 	log.Println(len(events.Items), "more events received")
-	sendEvents(events, w.cal, out)
+	// sendEvents(events, w.cal, out)
 	log.Println("got nextSyncToken=", w.nextSyncToken, " nextPageToken=", w.nextPageToken)
 	return err
 }
--- a/calsync/main.go	Tue Sep 03 14:50:20 2024 -0700
+++ b/calsync/main.go	Thu Sep 05 13:50:40 2024 -0700
@@ -20,6 +20,7 @@
 	"context"
 	"log"
 	"net/http"
+	"time"
 
 	"bigasterisk.com/go/gcalendarwatch/gcalclient"
 	"bigasterisk.com/go/gcalendarwatch/mongoclient"
@@ -27,8 +28,8 @@
 )
 
 func main() {
-	_=http.StripPrefix
-	_=mux.NewRouter
+	_ = http.StripPrefix
+	_ = mux.NewRouter
 
 	ctx := context.Background()
 
@@ -45,17 +46,20 @@
 	}
 	defer mc.Close()
 
-	err = updateMongoCalsToMatchGoogle(mc, gc)
+	// todo: if a cal is deleted, nothing touches its db events ever again.
+	// err = updateMongoCalsToMatchGoogleOnce(mc, gc)
+	// if err != nil {
+	// 	log.Fatal(err)
+	// }
+
+	err = updateMongoEventsToMatchGoogleForever(mc, gc,
+		time.Duration(7*24)*time.Hour, 
+		time.Duration(14*24)*time.Hour)
 	if err != nil {
 		log.Fatal(err)
 	}
+	log.Fatal("err was nil: updateMongoEventsToMatchGoogleForever shouldn't have returned")
 
-	err = updateMongoEventsToMatchGoogle(mc, gc)
-	if err != nil {
-		log.Fatal(err)
-	}
-
-	
 	/*
 		------------------
 		connect to mongodb with these ops:
--- a/calsync/mongoclient/events_collection.go	Tue Sep 03 14:50:20 2024 -0700
+++ b/calsync/mongoclient/events_collection.go	Thu Sep 05 13:50:40 2024 -0700
@@ -8,6 +8,10 @@
 	"go.mongodb.org/mongo-driver/mongo/options"
 )
 
+func LogWithCal(cal MongoCal, msg ...interface{}) {
+	log.Println("cal:", (cal.GoogleId+"........")[:8], msg)
+}
+
 func (c *MongoClient) UpsertOneEvent(ev MongoEvent) error {
 	filter := bson.M{"_id": ev.Url}
 	setFields := ev
@@ -19,9 +23,12 @@
 	return nil
 }
 
-func (c *MongoClient) DeleteEventsUpdatedBefore(t time.Time) error {
-	res, err := c.eventsCollection.DeleteMany(c.ctx, bson.M{"lastUpdated": bson.M{"$lt": t}})
-	log.Println("deleted", res.DeletedCount, "events")
+func (c *MongoClient) DeleteEventsUpdatedBefore(cal MongoCal, t time.Time) error {
+	res, err := c.eventsCollection.DeleteMany(
+		c.ctx,
+		bson.M{"calendarUrl": cal.Url,
+			"lastUpdated": bson.M{"$lt": t}})
+	LogWithCal(cal, "deleted", res.DeletedCount, "events updated before", t)
 	if err != nil {
 		return err
 	}