changeset 57:24f662799710

WIP incremental sync now runs
author drewp@bigasterisk.com
date Thu, 05 Sep 2024 15:03:05 -0700
parents 635ff76f867c
children 6c7151126a0b
files calsync/event_sync.go calsync/gcalclient/event_requests.go calsync/gcalclient/gcalclient.go calsync/main.go
diffstat 4 files changed, 89 insertions(+), 185 deletions(-) [+]
line wrap: on
line diff
--- a/calsync/event_sync.go	Thu Sep 05 13:50:40 2024 -0700
+++ b/calsync/event_sync.go	Thu Sep 05 15:03:05 2024 -0700
@@ -30,46 +30,67 @@
 	}
 	log.Println("syncing events from", len(cals), "calendars")
 
-	readerErr := make(chan error)
 	for _, cal := range cals {
 		rd := newCalEventsReader(mc, gc, cal, syncStart, syncEnd)
-		go func() {
-			readerErr <- rd.updateForever()
-		}()
+		go rd.updateForever()
 	}
-	return <-readerErr
+	return nil // return a mapping of webhook name to calEventsReader?
 }
 
+// i think this should own the webhook watcher
 type calEventsReader struct {
-	mc  *mongoclient.MongoClient
-	gc  *gcalclient.GCalClient
-	cal mongoclient.MongoCal
-	t1  time.Time
-	t2  time.Time
+	mc        *mongoclient.MongoClient
+	gc        *gcalclient.GCalClient
+	cal       mongoclient.MongoCal
+	t1        time.Time
+	t2        time.Time
+	syncToken string
 }
 
 func newCalEventsReader(mc *mongoclient.MongoClient, gc *gcalclient.GCalClient, cal mongoclient.MongoCal, t1 time.Time, t2 time.Time) *calEventsReader {
-	return &calEventsReader{mc, gc, cal, t1, t2}
+	return &calEventsReader{mc, gc, cal, t1, t2, ""}
 }
 
-func (r *calEventsReader) updateForever() error {
+func (r *calEventsReader) updateForever() {
+	err := r.updateInitialRange()
+	if err != nil {
+		log.Fatal(err)
+	}
 
-	var syncToken string
-	items, err := r.gc.ListEventsInRange(r.cal, r.t1, r.t2)
+	for {
+		log.Println("syncing", r.cal.Summary, "with sync", r.syncToken)
+		err = r.sync()
+		if err != nil {
+			log.Fatal(err)
+		}
+
+		time.Sleep(10 * time.Second)
+	}
+}
+
+func (r *calEventsReader) updateInitialRange() error {
+	events, nextSyncToken, err := r.gc.ListEventsInRange(r.cal, r.t1, r.t2)
+	r.syncToken = nextSyncToken
 	if err != nil {
 		return err
 	}
-	for _, item := range items {
-		r.mc.UpsertOneEvent(convert.MongoEventFromGoogleEvent2(&item.Event, time.Now() /*todo*/))
-		syncToken = item.SyncToken
+	for _, ev := range events {
+		r.mc.UpsertOneEvent(convert.MongoEventFromGoogleEvent2(&ev, time.Now() /*todo*/))
 	}
 
-	r.mc.DeleteEventsUpdatedBefore(r.cal, r.t1)
+	return r.mc.DeleteEventsUpdatedBefore(r.cal, r.t1)
+}
 
-	for _, item := range r.gc.ListEventUpdatesForever(syncToken) {
-		// r.mc.UpsertOneEvent(convert.MongoEventFromGoogleEvent2(item.ev, time.Now()/*todo*/))
-		syncToken = item.SyncToken
-		_ = syncToken
+func (r *calEventsReader) sync() error {
+	mongoclient.LogWithCal(r.cal, "syncing with", r.syncToken)
+	events, nextSyncToken, err := r.gc.ListEventUpdates(r.cal, r.syncToken)
+	if err != nil {
+		return err
 	}
+	r.syncToken = nextSyncToken
+	for _, ev := range events {
+		r.mc.UpsertOneEvent(convert.MongoEventFromGoogleEvent2(&ev, time.Now() /*todo*/))
+	}
+
 	return nil
 }
--- a/calsync/gcalclient/event_requests.go	Thu Sep 05 13:50:40 2024 -0700
+++ b/calsync/gcalclient/event_requests.go	Thu Sep 05 15:03:05 2024 -0700
@@ -8,8 +8,7 @@
 
 const pageSize = 20
 
-func rangedEventsCall(srv *calendar.Service, calGoogleId string,
-	initialFillStart, initialFillEnd time.Time, pageToken string) *calendar.EventsListCall {
+func rangedEventsCall(srv *calendar.Service, calGoogleId string, initialFillStart, initialFillEnd time.Time, pageToken string) *calendar.EventsListCall {
 	return srv.Events.List(calGoogleId).
 		ShowDeleted(false).
 		SingleEvents(true).
@@ -19,9 +18,11 @@
 		PageToken(pageToken)
 }
 
-func syncEventsCall(srv *calendar.Service, calGoogleId string) *calendar.EventsListCall {
+func syncEventsCall(srv *calendar.Service, calGoogleId string, syncToken, pageToken string) *calendar.EventsListCall {
 	return srv.Events.List(calGoogleId).
 		ShowDeleted(true).
 		SingleEvents(true).
-		MaxResults(pageSize)
+		MaxResults(pageSize).
+		PageToken(pageToken).
+		SyncToken(syncToken)
 }
--- a/calsync/gcalclient/gcalclient.go	Thu Sep 05 13:50:40 2024 -0700
+++ b/calsync/gcalclient/gcalclient.go	Thu Sep 05 15:03:05 2024 -0700
@@ -76,55 +76,6 @@
 	list.Items = ret
 }
 
-// // FindEvents considers all calendars. It spawns routines, does some initial sync, and returns.
-// func (gc *GCalClient) FindEvents(
-// 	mc *mongoclient.MongoClient,
-// 	// For each calendar, after events in this time range have been sent to
-// 	// `out`, the chan will get the other kind of FindEventsMessage (CalId,
-// 	// ...). That message signals that the caller may cull old events on the given
-// 	// calendar. After that point, all events will be updates (including
-// 	// deletes).
-// 	initialFillStart, initialFillEnd time.Time,
-// 	out chan *FindEventsMessage,
-// ) error {
-// 	cals, err := mc.GetAllCals()
-// 	if err != nil {
-// 		return err
-// 	}
-// 	log.Println("reading", len(cals), "calendars")
-// 	for calNum, cal := range cals {
-// 		go gc.calRoutine(calNum, cal, initialFillStart, initialFillEnd, out)
-// 	}
-
-// 	return nil
-// }
-
-// func (gc *GCalClient) calRoutine(calNum int, cal mongoclient.MongoCal, initialFillStart time.Time, initialFillEnd time.Time, out chan *FindEventsMessage) bool {
-// 	t := time.Now()
-// 	log.Println("  cal", calNum, cal.Url)
-// 	log.Println("  cal", calNum, "readEventsInRange", "from", initialFillStart, "to", initialFillEnd)
-// 	syncToken, err := gc.readEventsInRange(&cal, initialFillStart, initialFillEnd, out)
-// 	if err != nil {
-// 		log.Panicln(err)
-// 		return true
-// 	}
-
-// 	out <- &FindEventsMessage{nil, cal.GoogleId, t}
-
-// 	ew := gc.NewEventWatch(&cal, t, syncToken)
-
-// 	for loop := 0; ; loop++ {
-// 		log.Println("")
-// 		log.Println("tail loop", loop, "for", cal.Url)
-// 		err := ew.GetMoreEvents(out)
-// 		if err != nil {
-// 			log.Panicln(err)
-// 			return true
-// 		}
-// 		time.Sleep(10 * time.Minute)
-// 	}
-
-// }
 
 func shortDebugHash(pageToken string) string {
 	if pageToken == "" {
@@ -133,55 +84,54 @@
 	return fmt.Sprintf("%x", md5.Sum([]byte(pageToken)))
 }
 
-type EventResult struct {
-	Event     CalendarEvent
-	SyncToken string
-}
-
 func (gc *GCalClient) ListEventsInRange(cal mongoclient.MongoCal, t1, t2 time.Time) (
-	[]EventResult, error) {
-	ret := make([]EventResult, 0)
+	events []CalendarEvent, nextSyncToken string, err error) {
 	mongoclient.LogWithCal(cal, "ListEventsInRange", t1, "to", t2)
 
-	syncToken := ""
-	var err error
-	var events []CalendarEvent
-
-	events, syncToken, err = readEventsPages(gc, cal, t1, t2)
+	call := func(pageToken string) *calendar.EventsListCall {
+		return rangedEventsCall(gc.srv, cal.GoogleId, t1, t2, pageToken)
+	}
+	events, nextSyncToken, err = readEventsPages(cal, call)
 	if err != nil {
-		log.Panicln(err)
-		return ret, err
-	}
-	for _, e := range events {
-		ret = append(ret, EventResult{e, syncToken})
+		return nil, "", err
 	}
 
-	return ret, nil
+	return events, nextSyncToken, nil
 }
 
-func (gc *GCalClient) ListEventUpdatesForever(syncToken string) []EventResult {
-	for {
-		time.Sleep(5 * time.Minute)
+func (gc *GCalClient) ListEventUpdates(cal mongoclient.MongoCal, syncToken string) (
+	events []CalendarEvent, nextSyncToken string, err error) {
+	mongoclient.LogWithCal(cal, "ListEventUpdates", syncToken)
+
+	call := func(pageToken string) *calendar.EventsListCall {
+		return syncEventsCall(gc.srv, cal.GoogleId, syncToken, pageToken)
+	}
+	events, nextSyncToken, err = readEventsPages(cal, call)
+	if err != nil {
+		return nil, "", err
 	}
 
+	return events, nextSyncToken, nil
 }
 
-func readEventsPages(gc *GCalClient, cal mongoclient.MongoCal, t1, t2 time.Time) (
+func readEventsPages(cal mongoclient.MongoCal, call func(string) *calendar.EventsListCall) (
 	events []CalendarEvent, syncToken string, err error) {
 
 	events = make([]CalendarEvent, 0)
-	syncToken = ""
 	err = nil
 
 	pageToken := ""
 	for {
 		mongoclient.LogWithCal(cal, "getting another page", shortDebugHash(pageToken))
-		pageResult, err2 := rangedEventsCall(gc.srv, cal.GoogleId, t1, t2, pageToken).Do()
+		pageResult, err2 := call(pageToken).Do()
 		if err2 != nil {
 			log.Fatal(err2)
 			return nil, "", err2
 		}
 
+		// Placement is important! This must run even if the result set is empty.
+		syncToken = pageResult.NextSyncToken
+
 		mongoclient.LogWithCal(cal, "got page with", len(pageResult.Items), "events")
 		if len(pageResult.Items) == 0 {
 			break
@@ -198,7 +148,8 @@
 			})
 		}
 
-		syncToken = pageResult.NextSyncToken
+
+
 		if pageResult.NextPageToken == "" {
 			break
 		}
@@ -207,47 +158,3 @@
 	mongoclient.LogWithCal(cal, "total events read: ", len(events), "with syncToken", syncToken)
 	return events, syncToken, nil
 }
-
-type eventWatch struct {
-	gc            *GCalClient
-	cal           *mongoclient.MongoCal
-	nextSyncToken string
-	nextPageToken string
-	modSince      time.Time
-}
-
-func (gc *GCalClient) NewEventWatch(
-	cal *mongoclient.MongoCal,
-	modSince time.Time,
-	syncToken string,
-) *eventWatch {
-	ew := &eventWatch{gc, cal, syncToken, "", modSince}
-	return ew
-}
-
-// Call this when there are likely new changes to sync.
-func (w *eventWatch) GetMoreEvents() error {
-	call := syncEventsCall(w.gc.srv, w.cal.GoogleId)
-
-	log.Println("listing events on", w.cal.GoogleId, "with")
-	if w.nextPageToken != "" {
-		call = call.PageToken(w.nextPageToken)
-		log.Println("   pageToken", w.nextPageToken)
-	} else if w.nextSyncToken != "" {
-		call = call.SyncToken(w.nextSyncToken)
-		log.Println("   syncToken", w.nextSyncToken)
-	} else {
-		call = call.UpdatedMin((w.modSince.Format(time.RFC3339)))
-		log.Println("   updatedMin", w.modSince.Format(time.RFC3339))
-	}
-	events, err := call.Do()
-	if err != nil {
-		return err
-	}
-	w.nextSyncToken = events.NextSyncToken
-	w.nextPageToken = events.NextPageToken
-	log.Println(len(events.Items), "more events received")
-	// sendEvents(events, w.cal, out)
-	log.Println("got nextSyncToken=", w.nextSyncToken, " nextPageToken=", w.nextPageToken)
-	return err
-}
--- a/calsync/main.go	Thu Sep 05 13:50:40 2024 -0700
+++ b/calsync/main.go	Thu Sep 05 15:03:05 2024 -0700
@@ -28,9 +28,6 @@
 )
 
 func main() {
-	_ = http.StripPrefix
-	_ = mux.NewRouter
-
 	ctx := context.Background()
 
 	log.SetFlags(log.LstdFlags | log.Lshortfile)
@@ -47,10 +44,10 @@
 	defer mc.Close()
 
 	// todo: if a cal is deleted, nothing touches its db events ever again.
-	// err = updateMongoCalsToMatchGoogleOnce(mc, gc)
-	// if err != nil {
-	// 	log.Fatal(err)
-	// }
+	err = updateMongoCalsToMatchGoogleOnce(mc, gc)
+	if err != nil {
+		log.Fatal(err)
+	}
 
 	err = updateMongoEventsToMatchGoogleForever(mc, gc,
 		time.Duration(7*24)*time.Hour, 
@@ -58,41 +55,19 @@
 	if err != nil {
 		log.Fatal(err)
 	}
-	log.Fatal("err was nil: updateMongoEventsToMatchGoogleForever shouldn't have returned")
 
-	/*
-		------------------
-		connect to mongodb with these ops:
-		    save cals list
-		    get/set incremental token
-			add/edit/del events
-		    collection.find({"startTime": {"$gte": t1, "$lt": t2}}).sort([("startTime", 1)])
-		    collection.find({"startTime": {"$lte": now}, "endTime": {"$gte": now}}))
-
-		connect to https://github.com/googleapis/google-api-go-client/tree/main/calendar/v3 and:
-			get all my cals
-
-			subscribe to events
-
-			get cal event changes from incremental token
+	r := mux.NewRouter()
+	http.Handle("/", r)
 
-			get cal events in range, for initial fill?
-
-		    write add/edit/del changes to mongo
-	*/
-
-	// r := mux.NewRouter()
-	// http.Handle("/", r)
+	home := func(w http.ResponseWriter, r *http.Request) {
+		w.Write([]byte("calsync service for calendar updates"))
+	}
+	r.HandleFunc("/", home)
+	r.HandleFunc("/gcalendarwatch", home)
 
-	// home := func(w http.ResponseWriter, r *http.Request) {
-	// 	w.Write([]byte("calsync service for calendar updates"))
-	// }
-	// r.HandleFunc("/", home)
-	// r.HandleFunc("/gcalendarwatch", home)
-
-	// notificationHandler := func(w http.ResponseWriter, r *http.Request) {
-	// }
-	// r.HandleFunc("/gcalendarwatch/notification", notificationHandler).Methods("POST")
-	// log.Println(("serving /gcalendarwatch/notification on :8080"))
-	// log.Fatal(http.ListenAndServe(":8080", nil))
+	notificationHandler := func(w http.ResponseWriter, r *http.Request) {
+	}
+	r.HandleFunc("/gcalendarwatch/notification", notificationHandler).Methods("POST")
+	log.Println(("serving /gcalendarwatch/notification on :8080"))
+	log.Fatal(http.ListenAndServe(":8080", nil))
 }