diff calsync/gcalclient/gcalclient.go @ 56:635ff76f867c

WIP: rewrite: process load+sync in parallel between cals; simplify a lot
author drewp@bigasterisk.com
date Thu, 05 Sep 2024 13:50:40 -0700
parents 627c815f83bb
children 24f662799710
line wrap: on
line diff
--- a/calsync/gcalclient/gcalclient.go	Tue Sep 03 14:50:20 2024 -0700
+++ b/calsync/gcalclient/gcalclient.go	Thu Sep 05 13:50:40 2024 -0700
@@ -1,7 +1,14 @@
 package gcalclient
 
+/*
+Note: this module keeps gcal *paging* private (we fetch all the pages), but
+*sync* is public. At least, callers may receive a syncToken and have to pass it
+back in.
+*/
 import (
 	"context"
+	"crypto/md5"
+	"fmt"
 	"log"
 	"net/url"
 	"strings"
@@ -45,9 +52,9 @@
 	// todo: disconnect watches if possible
 }
 
-func (gc *GCalClient) AllCalendars(maxResults int64) ([]*calendar.CalendarListEntry, error) {
+func (gc *GCalClient) AllCalendars() ([]*calendar.CalendarListEntry, error) {
 	// todo: pagination
-	list, err := gc.srv.CalendarList.List().MaxResults(maxResults).Do()
+	list, err := gc.srv.CalendarList.List().MaxResults( /*maxResults*/ 100).Do()
 	if err != nil {
 		return nil, err
 	}
@@ -69,123 +76,136 @@
 	list.Items = ret
 }
 
-type FindEventsMessage struct {
-	// either non-nil this:
-	Event *CalendarEvent
-	// or these:
-	CalId                    string
-	OlderThanThisIsDeletable time.Time
+// // FindEvents considers all calendars. It spawns routines, does some initial sync, and returns.
+// func (gc *GCalClient) FindEvents(
+// 	mc *mongoclient.MongoClient,
+// 	// For each calendar, after events in this time range have been sent to
+// 	// `out`, the chan will get the other kind of FindEventsMessage (CalId,
+// 	// ...). That message signals that the caller may cull old events on the given
+// 	// calendar. After that point, all events will be updates (including
+// 	// deletes).
+// 	initialFillStart, initialFillEnd time.Time,
+// 	out chan *FindEventsMessage,
+// ) error {
+// 	cals, err := mc.GetAllCals()
+// 	if err != nil {
+// 		return err
+// 	}
+// 	log.Println("reading", len(cals), "calendars")
+// 	for calNum, cal := range cals {
+// 		go gc.calRoutine(calNum, cal, initialFillStart, initialFillEnd, out)
+// 	}
+
+// 	return nil
+// }
+
+// func (gc *GCalClient) calRoutine(calNum int, cal mongoclient.MongoCal, initialFillStart time.Time, initialFillEnd time.Time, out chan *FindEventsMessage) bool {
+// 	t := time.Now()
+// 	log.Println("  cal", calNum, cal.Url)
+// 	log.Println("  cal", calNum, "readEventsInRange", "from", initialFillStart, "to", initialFillEnd)
+// 	syncToken, err := gc.readEventsInRange(&cal, initialFillStart, initialFillEnd, out)
+// 	if err != nil {
+// 		log.Panicln(err)
+// 		return true
+// 	}
+
+// 	out <- &FindEventsMessage{nil, cal.GoogleId, t}
+
+// 	ew := gc.NewEventWatch(&cal, t, syncToken)
+
+// 	for loop := 0; ; loop++ {
+// 		log.Println("")
+// 		log.Println("tail loop", loop, "for", cal.Url)
+// 		err := ew.GetMoreEvents(out)
+// 		if err != nil {
+// 			log.Panicln(err)
+// 			return true
+// 		}
+// 		time.Sleep(10 * time.Minute)
+// 	}
+
+// }
+
+func shortDebugHash(pageToken string) string {
+	if pageToken == "" {
+		return "(empty)"
+	}
+	return fmt.Sprintf("%x", md5.Sum([]byte(pageToken)))
 }
 
-// FindEvents considers all calendars. It spawns routines, does some initial sync, and returns.
-func (gc *GCalClient) FindEvents(
-	mc *mongoclient.MongoClient,
-	// For each calendar, after events in this time range have been sent to
-	// `out`, the chan will get the other kind of FindEventsMessage (CalId,
-	// ...). That message signals that the caller may cull old events on the given
-	// calendar. After that point, all events will be updates (including
-	// deletes).
-	initialFillStart, initialFillEnd time.Time,
-	out chan *FindEventsMessage,
-) error {
-	cals, err := mc.GetAllCals()
-	if err != nil {
-		return err
-	}
-	log.Println("reading", len(cals), "calendars")
-	for calNum, cal := range cals {
-		go gc.calRoutine(calNum, cal, initialFillStart, initialFillEnd, out)
-	}
-
-	return nil
+type EventResult struct {
+	Event     CalendarEvent
+	SyncToken string
 }
 
-func (gc *GCalClient) calRoutine(calNum int, cal mongoclient.MongoCal, initialFillStart time.Time, initialFillEnd time.Time, out chan *FindEventsMessage) bool {
-	t := time.Now()
-	log.Println("  cal", calNum, cal.Url)
-	log.Println("  cal", calNum, "readEventsInRange", "from", initialFillStart, "to", initialFillEnd)
-	syncToken, err := gc.readEventsInRange(&cal, initialFillStart, initialFillEnd, out)
+func (gc *GCalClient) ListEventsInRange(cal mongoclient.MongoCal, t1, t2 time.Time) (
+	[]EventResult, error) {
+	ret := make([]EventResult, 0)
+	mongoclient.LogWithCal(cal, "ListEventsInRange", t1, "to", t2)
+
+	syncToken := ""
+	var err error
+	var events []CalendarEvent
+
+	events, syncToken, err = readEventsPages(gc, cal, t1, t2)
 	if err != nil {
 		log.Panicln(err)
-		return true
+		return ret, err
+	}
+	for _, e := range events {
+		ret = append(ret, EventResult{e, syncToken})
 	}
 
-	out <- &FindEventsMessage{nil, cal.GoogleId, t}
-
-	ew := gc.NewEventWatch(&cal, t, syncToken)
+	return ret, nil
+}
 
-	for loop := 0; ; loop++ {
-		log.Println("")
-		log.Println("tail loop", loop, "for", cal.Url)
-		err := ew.GetMoreEvents(out)
-		if err != nil {
-			log.Panicln(err)
-			return true
-		}
-		time.Sleep(10 * time.Minute)
+func (gc *GCalClient) ListEventUpdatesForever(syncToken string) []EventResult {
+	for {
+		time.Sleep(5 * time.Minute)
 	}
 
 }
 
-// Synchronous.
-func (gc *GCalClient) readEventsInRange(cal *mongoclient.MongoCal, t1, t2 time.Time, out chan *FindEventsMessage) (string, error) {
-	log.Println("    get initial events for", cal.Url, "between", t1, "and", t2)
+func readEventsPages(gc *GCalClient, cal mongoclient.MongoCal, t1, t2 time.Time) (
+	events []CalendarEvent, syncToken string, err error) {
+
+	events = make([]CalendarEvent, 0)
+	syncToken = ""
+	err = nil
 
 	pageToken := ""
-	syncToken := ""
-	var err error
-	var done bool
 	for {
-		done, syncToken, pageToken, err = readEventsPage(gc, cal, t1, t2, pageToken, out)
-		if err != nil {
-			log.Panicln(err)
-			return "", err
+		mongoclient.LogWithCal(cal, "getting another page", shortDebugHash(pageToken))
+		pageResult, err2 := rangedEventsCall(gc.srv, cal.GoogleId, t1, t2, pageToken).Do()
+		if err2 != nil {
+			log.Fatal(err2)
+			return nil, "", err2
 		}
-		if done {
+
+		mongoclient.LogWithCal(cal, "got page with", len(pageResult.Items), "events")
+		if len(pageResult.Items) == 0 {
 			break
 		}
-	}
-	return syncToken, nil
-}
-
-func readEventsPage(gc *GCalClient, cal *mongoclient.MongoCal, t1, t2 time.Time, pageToken string, out chan *FindEventsMessage) (done bool, syncToken, nextPageToken string, err error) {
-	log.Println("      getting another page", pageToken)
-	events, err := rangedEventsCall(gc.srv, cal.GoogleId, t1, t2, pageToken).Do()
-	if err != nil {
-		return
-	}
-
-	log.Println("        got", len(events.Items), "events, sync=", events.NextSyncToken)
-	if len(events.Items) == 0 {
-		done = true
-		return
-	}
-
-	sendEvents(events, cal, out)
 
-	syncToken = events.NextSyncToken
-	if events.NextPageToken == "" {
-		done = true
-		return
-	}
-	nextPageToken = events.NextPageToken
-	syncToken = events.NextSyncToken
-	return
-}
+		for _, ev := range pageResult.Items {
+			if ev.Status == "cancelled" {
+				log.Fatal("todo")
+			}
+			events = append(events, CalendarEvent{
+				Event:       ev,
+				CalendarUrl: cal.Url,
+				EventUrl:    MakeEventUrl(cal.Url, ev.Id),
+			})
+		}
 
-// Send a page of calendar.Events over a channel, as CalendarEvent structs.
-func sendEvents(events *calendar.Events, cal *mongoclient.MongoCal, out chan *FindEventsMessage) {
-	for _, event := range events.Items {
-		if event.Status == "cancelled" {
-			log.Fatal("todo")
+		syncToken = pageResult.NextSyncToken
+		if pageResult.NextPageToken == "" {
+			break
 		}
-		out <- &FindEventsMessage{
-			Event: &CalendarEvent{
-				Event:       event,
-				CalendarUrl: cal.Url,
-				EventUrl:    MakeEventUrl(cal.Url, event.Id),
-			}}
+		pageToken = pageResult.NextPageToken
 	}
+	mongoclient.LogWithCal(cal, "total events read: ", len(events), "with syncToken", syncToken)
+	return events, syncToken, nil
 }
 
 type eventWatch struct {
@@ -206,7 +226,7 @@
 }
 
 // Call this when there are likely new changes to sync.
-func (w *eventWatch) GetMoreEvents(out chan *FindEventsMessage) error {
+func (w *eventWatch) GetMoreEvents() error {
 	call := syncEventsCall(w.gc.srv, w.cal.GoogleId)
 
 	log.Println("listing events on", w.cal.GoogleId, "with")
@@ -227,7 +247,7 @@
 	w.nextSyncToken = events.NextSyncToken
 	w.nextPageToken = events.NextPageToken
 	log.Println(len(events.Items), "more events received")
-	sendEvents(events, w.cal, out)
+	// sendEvents(events, w.cal, out)
 	log.Println("got nextSyncToken=", w.nextSyncToken, " nextPageToken=", w.nextPageToken)
 	return err
 }