Mercurial > code > home > repos > gcalendarwatch
changeset 56:635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
author | drewp@bigasterisk.com |
---|---|
date | Thu, 05 Sep 2024 13:50:40 -0700 |
parents | 627c815f83bb |
children | 24f662799710 |
files | calsync/cal_sync.go calsync/convert/convert.go calsync/event_sync.go calsync/gcalclient/event_requests.go calsync/gcalclient/gcalclient.go calsync/main.go calsync/mongoclient/events_collection.go |
diffstat | 7 files changed, 212 insertions(+), 154 deletions(-) [+] |
line wrap: on
line diff
--- a/calsync/cal_sync.go Tue Sep 03 14:50:20 2024 -0700 +++ b/calsync/cal_sync.go Thu Sep 05 13:50:40 2024 -0700 @@ -8,12 +8,12 @@ "bigasterisk.com/go/gcalendarwatch/mongoclient" ) -func updateMongoCalsToMatchGoogle(mc *mongoclient.MongoClient, gc *gcalclient.GCalClient) (err error) { +func updateMongoCalsToMatchGoogleOnce(mc *mongoclient.MongoClient, gc *gcalclient.GCalClient) (err error) { log.Println("updateMongoCalsToMatchGoogle") seen := make(map[string]bool) - cals, err := gc.AllCalendars(100) + cals, err := gc.AllCalendars() if err != nil { return err } @@ -22,8 +22,15 @@ calUrl := gcalclient.MakeCalUrl(cal.Id) log.Println("syncing", calUrl) seen[calUrl] = true - mc.UpsertOneCal(convert.MongoCalFromGoogleCal(cal)) + err = mc.UpsertOneCal(convert.MongoCalFromGoogleCal(cal)) + if err != nil { + return err + } + log.Println("syncing", calUrl, "done") } - return mc.DeleteCalsNotInSet(seen) + err = mc.DeleteCalsNotInSet(seen) + + log.Println("updateMongoCalsToMatchGoogle done") + return err }
--- a/calsync/convert/convert.go Tue Sep 03 14:50:20 2024 -0700 +++ b/calsync/convert/convert.go Thu Sep 05 13:50:40 2024 -0700 @@ -19,7 +19,6 @@ } func MongoEventFromGoogleEvent2( - calUrl string, ev *gcalclient.CalendarEvent, now time.Time, ) mongoclient.MongoEvent {
--- a/calsync/event_sync.go Tue Sep 03 14:50:20 2024 -0700 +++ b/calsync/event_sync.go Thu Sep 05 13:50:40 2024 -0700 @@ -9,46 +9,67 @@ "bigasterisk.com/go/gcalendarwatch/mongoclient" ) -// Runs forever. Applies to all calendars. -func updateMongoEventsToMatchGoogle( - mc *mongoclient.MongoClient, gc *gcalclient.GCalClient) error { - t := time.Now() - eventUpdates := make(chan *gcalclient.FindEventsMessage) +// Each calendar syncs like this: +// 1. Full sync of events taking place between `now-keepHistory` to `now+syncAhead`. +// 2. Garbage-collect all events last-modified before `now-keepHistory` +// 3. Continuous watch of each calendar to catch updates. +func updateMongoEventsToMatchGoogleForever( + mc *mongoclient.MongoClient, + gc *gcalclient.GCalClient, + keepHistory time.Duration, + syncAhead time.Duration) error { + + now := time.Now() + syncStart := now.Add(-keepHistory) + syncEnd := now.Add(syncAhead) + // Note that we could receive updates outside this interval. - updateRoutine(eventUpdates, gc, mc) + cals, err := mc.GetAllCals() + if err != nil { + return err + } + log.Println("syncing events from", len(cals), "calendars") + + readerErr := make(chan error) + for _, cal := range cals { + rd := newCalEventsReader(mc, gc, cal, syncStart, syncEnd) + go func() { + readerErr <- rd.updateForever() + }() + } + return <-readerErr +} - for ev := range eventUpdates { - log.Println("up for ev", ev.CalId) - if ev.Event != nil { - mc.UpsertOneEvent( - convert.MongoEventFromGoogleEvent2( - ev.Event.CalendarUrl, - ev.Event, - /*modSince=*/ t, - ), - ) - } else { - log.Println("cal", ev.CalId, "ready for cleanup") - log.Println("t=", t) - mc.DeleteEventsUpdatedBefore(t) - } +type calEventsReader struct { + mc *mongoclient.MongoClient + gc *gcalclient.GCalClient + cal mongoclient.MongoCal + t1 time.Time + t2 time.Time +} + +func newCalEventsReader(mc *mongoclient.MongoClient, gc *gcalclient.GCalClient, cal mongoclient.MongoCal, t1 time.Time, t2 time.Time) *calEventsReader { + return &calEventsReader{mc, gc, cal, t1, t2} +} + +func (r *calEventsReader) updateForever() error { + + var syncToken string + items, err := r.gc.ListEventsInRange(r.cal, r.t1, r.t2) + if err != nil { + return err } - log.Fatalln("updateMongoEventsToMatchGoogle done??") + for _, item := range items { + r.mc.UpsertOneEvent(convert.MongoEventFromGoogleEvent2(&item.Event, time.Now() /*todo*/)) + syncToken = item.SyncToken + } + + r.mc.DeleteEventsUpdatedBefore(r.cal, r.t1) + + for _, item := range r.gc.ListEventUpdatesForever(syncToken) { + // r.mc.UpsertOneEvent(convert.MongoEventFromGoogleEvent2(item.ev, time.Now()/*todo*/)) + syncToken = item.SyncToken + _ = syncToken + } return nil } - -func updateRoutine( - eventUpdates chan *gcalclient.FindEventsMessage, - gc *gcalclient.GCalClient, - mc *mongoclient.MongoClient, -) { - defer close(eventUpdates) - - t := time.Now() - err := gc.FindEvents(mc, t.AddDate(0, 0, -3), t.AddDate(0, 0, 7), eventUpdates) - if err != nil { - log.Println(err) - return - } - log.Println("updateRoutine done") -}
--- a/calsync/gcalclient/event_requests.go Tue Sep 03 14:50:20 2024 -0700 +++ b/calsync/gcalclient/event_requests.go Thu Sep 05 13:50:40 2024 -0700 @@ -6,7 +6,7 @@ "google.golang.org/api/calendar/v3" ) -const pageSize = 4 +const pageSize = 20 func rangedEventsCall(srv *calendar.Service, calGoogleId string, initialFillStart, initialFillEnd time.Time, pageToken string) *calendar.EventsListCall {
--- a/calsync/gcalclient/gcalclient.go Tue Sep 03 14:50:20 2024 -0700 +++ b/calsync/gcalclient/gcalclient.go Thu Sep 05 13:50:40 2024 -0700 @@ -1,7 +1,14 @@ package gcalclient +/* +Note: this module keeps gcal *paging* private (we fetch all the pages), but +*sync* is public. At least, callers may receive a syncToken and have to pass it +back in. +*/ import ( "context" + "crypto/md5" + "fmt" "log" "net/url" "strings" @@ -45,9 +52,9 @@ // todo: disconnect watches if possible } -func (gc *GCalClient) AllCalendars(maxResults int64) ([]*calendar.CalendarListEntry, error) { +func (gc *GCalClient) AllCalendars() ([]*calendar.CalendarListEntry, error) { // todo: pagination - list, err := gc.srv.CalendarList.List().MaxResults(maxResults).Do() + list, err := gc.srv.CalendarList.List().MaxResults( /*maxResults*/ 100).Do() if err != nil { return nil, err } @@ -69,123 +76,136 @@ list.Items = ret } -type FindEventsMessage struct { - // either non-nil this: - Event *CalendarEvent - // or these: - CalId string - OlderThanThisIsDeletable time.Time +// // FindEvents considers all calendars. It spawns routines, does some initial sync, and returns. +// func (gc *GCalClient) FindEvents( +// mc *mongoclient.MongoClient, +// // For each calendar, after events in this time range have been sent to +// // `out`, the chan will get the other kind of FindEventsMessage (CalId, +// // ...). That message signals that the caller may cull old events on the given +// // calendar. After that point, all events will be updates (including +// // deletes). +// initialFillStart, initialFillEnd time.Time, +// out chan *FindEventsMessage, +// ) error { +// cals, err := mc.GetAllCals() +// if err != nil { +// return err +// } +// log.Println("reading", len(cals), "calendars") +// for calNum, cal := range cals { +// go gc.calRoutine(calNum, cal, initialFillStart, initialFillEnd, out) +// } + +// return nil +// } + +// func (gc *GCalClient) calRoutine(calNum int, cal mongoclient.MongoCal, initialFillStart time.Time, initialFillEnd time.Time, out chan *FindEventsMessage) bool { +// t := time.Now() +// log.Println(" cal", calNum, cal.Url) +// log.Println(" cal", calNum, "readEventsInRange", "from", initialFillStart, "to", initialFillEnd) +// syncToken, err := gc.readEventsInRange(&cal, initialFillStart, initialFillEnd, out) +// if err != nil { +// log.Panicln(err) +// return true +// } + +// out <- &FindEventsMessage{nil, cal.GoogleId, t} + +// ew := gc.NewEventWatch(&cal, t, syncToken) + +// for loop := 0; ; loop++ { +// log.Println("") +// log.Println("tail loop", loop, "for", cal.Url) +// err := ew.GetMoreEvents(out) +// if err != nil { +// log.Panicln(err) +// return true +// } +// time.Sleep(10 * time.Minute) +// } + +// } + +func shortDebugHash(pageToken string) string { + if pageToken == "" { + return "(empty)" + } + return fmt.Sprintf("%x", md5.Sum([]byte(pageToken))) } -// FindEvents considers all calendars. It spawns routines, does some initial sync, and returns. -func (gc *GCalClient) FindEvents( - mc *mongoclient.MongoClient, - // For each calendar, after events in this time range have been sent to - // `out`, the chan will get the other kind of FindEventsMessage (CalId, - // ...). That message signals that the caller may cull old events on the given - // calendar. After that point, all events will be updates (including - // deletes). - initialFillStart, initialFillEnd time.Time, - out chan *FindEventsMessage, -) error { - cals, err := mc.GetAllCals() - if err != nil { - return err - } - log.Println("reading", len(cals), "calendars") - for calNum, cal := range cals { - go gc.calRoutine(calNum, cal, initialFillStart, initialFillEnd, out) - } - - return nil +type EventResult struct { + Event CalendarEvent + SyncToken string } -func (gc *GCalClient) calRoutine(calNum int, cal mongoclient.MongoCal, initialFillStart time.Time, initialFillEnd time.Time, out chan *FindEventsMessage) bool { - t := time.Now() - log.Println(" cal", calNum, cal.Url) - log.Println(" cal", calNum, "readEventsInRange", "from", initialFillStart, "to", initialFillEnd) - syncToken, err := gc.readEventsInRange(&cal, initialFillStart, initialFillEnd, out) +func (gc *GCalClient) ListEventsInRange(cal mongoclient.MongoCal, t1, t2 time.Time) ( + []EventResult, error) { + ret := make([]EventResult, 0) + mongoclient.LogWithCal(cal, "ListEventsInRange", t1, "to", t2) + + syncToken := "" + var err error + var events []CalendarEvent + + events, syncToken, err = readEventsPages(gc, cal, t1, t2) if err != nil { log.Panicln(err) - return true + return ret, err + } + for _, e := range events { + ret = append(ret, EventResult{e, syncToken}) } - out <- &FindEventsMessage{nil, cal.GoogleId, t} - - ew := gc.NewEventWatch(&cal, t, syncToken) + return ret, nil +} - for loop := 0; ; loop++ { - log.Println("") - log.Println("tail loop", loop, "for", cal.Url) - err := ew.GetMoreEvents(out) - if err != nil { - log.Panicln(err) - return true - } - time.Sleep(10 * time.Minute) +func (gc *GCalClient) ListEventUpdatesForever(syncToken string) []EventResult { + for { + time.Sleep(5 * time.Minute) } } -// Synchronous. -func (gc *GCalClient) readEventsInRange(cal *mongoclient.MongoCal, t1, t2 time.Time, out chan *FindEventsMessage) (string, error) { - log.Println(" get initial events for", cal.Url, "between", t1, "and", t2) +func readEventsPages(gc *GCalClient, cal mongoclient.MongoCal, t1, t2 time.Time) ( + events []CalendarEvent, syncToken string, err error) { + + events = make([]CalendarEvent, 0) + syncToken = "" + err = nil pageToken := "" - syncToken := "" - var err error - var done bool for { - done, syncToken, pageToken, err = readEventsPage(gc, cal, t1, t2, pageToken, out) - if err != nil { - log.Panicln(err) - return "", err + mongoclient.LogWithCal(cal, "getting another page", shortDebugHash(pageToken)) + pageResult, err2 := rangedEventsCall(gc.srv, cal.GoogleId, t1, t2, pageToken).Do() + if err2 != nil { + log.Fatal(err2) + return nil, "", err2 } - if done { + + mongoclient.LogWithCal(cal, "got page with", len(pageResult.Items), "events") + if len(pageResult.Items) == 0 { break } - } - return syncToken, nil -} - -func readEventsPage(gc *GCalClient, cal *mongoclient.MongoCal, t1, t2 time.Time, pageToken string, out chan *FindEventsMessage) (done bool, syncToken, nextPageToken string, err error) { - log.Println(" getting another page", pageToken) - events, err := rangedEventsCall(gc.srv, cal.GoogleId, t1, t2, pageToken).Do() - if err != nil { - return - } - - log.Println(" got", len(events.Items), "events, sync=", events.NextSyncToken) - if len(events.Items) == 0 { - done = true - return - } - - sendEvents(events, cal, out) - syncToken = events.NextSyncToken - if events.NextPageToken == "" { - done = true - return - } - nextPageToken = events.NextPageToken - syncToken = events.NextSyncToken - return -} + for _, ev := range pageResult.Items { + if ev.Status == "cancelled" { + log.Fatal("todo") + } + events = append(events, CalendarEvent{ + Event: ev, + CalendarUrl: cal.Url, + EventUrl: MakeEventUrl(cal.Url, ev.Id), + }) + } -// Send a page of calendar.Events over a channel, as CalendarEvent structs. -func sendEvents(events *calendar.Events, cal *mongoclient.MongoCal, out chan *FindEventsMessage) { - for _, event := range events.Items { - if event.Status == "cancelled" { - log.Fatal("todo") + syncToken = pageResult.NextSyncToken + if pageResult.NextPageToken == "" { + break } - out <- &FindEventsMessage{ - Event: &CalendarEvent{ - Event: event, - CalendarUrl: cal.Url, - EventUrl: MakeEventUrl(cal.Url, event.Id), - }} + pageToken = pageResult.NextPageToken } + mongoclient.LogWithCal(cal, "total events read: ", len(events), "with syncToken", syncToken) + return events, syncToken, nil } type eventWatch struct { @@ -206,7 +226,7 @@ } // Call this when there are likely new changes to sync. -func (w *eventWatch) GetMoreEvents(out chan *FindEventsMessage) error { +func (w *eventWatch) GetMoreEvents() error { call := syncEventsCall(w.gc.srv, w.cal.GoogleId) log.Println("listing events on", w.cal.GoogleId, "with") @@ -227,7 +247,7 @@ w.nextSyncToken = events.NextSyncToken w.nextPageToken = events.NextPageToken log.Println(len(events.Items), "more events received") - sendEvents(events, w.cal, out) + // sendEvents(events, w.cal, out) log.Println("got nextSyncToken=", w.nextSyncToken, " nextPageToken=", w.nextPageToken) return err }
--- a/calsync/main.go Tue Sep 03 14:50:20 2024 -0700 +++ b/calsync/main.go Thu Sep 05 13:50:40 2024 -0700 @@ -20,6 +20,7 @@ "context" "log" "net/http" + "time" "bigasterisk.com/go/gcalendarwatch/gcalclient" "bigasterisk.com/go/gcalendarwatch/mongoclient" @@ -27,8 +28,8 @@ ) func main() { - _=http.StripPrefix - _=mux.NewRouter + _ = http.StripPrefix + _ = mux.NewRouter ctx := context.Background() @@ -45,17 +46,20 @@ } defer mc.Close() - err = updateMongoCalsToMatchGoogle(mc, gc) + // todo: if a cal is deleted, nothing touches its db events ever again. + // err = updateMongoCalsToMatchGoogleOnce(mc, gc) + // if err != nil { + // log.Fatal(err) + // } + + err = updateMongoEventsToMatchGoogleForever(mc, gc, + time.Duration(7*24)*time.Hour, + time.Duration(14*24)*time.Hour) if err != nil { log.Fatal(err) } + log.Fatal("err was nil: updateMongoEventsToMatchGoogleForever shouldn't have returned") - err = updateMongoEventsToMatchGoogle(mc, gc) - if err != nil { - log.Fatal(err) - } - - /* ------------------ connect to mongodb with these ops:
--- a/calsync/mongoclient/events_collection.go Tue Sep 03 14:50:20 2024 -0700 +++ b/calsync/mongoclient/events_collection.go Thu Sep 05 13:50:40 2024 -0700 @@ -8,6 +8,10 @@ "go.mongodb.org/mongo-driver/mongo/options" ) +func LogWithCal(cal MongoCal, msg ...interface{}) { + log.Println("cal:", (cal.GoogleId+"........")[:8], msg) +} + func (c *MongoClient) UpsertOneEvent(ev MongoEvent) error { filter := bson.M{"_id": ev.Url} setFields := ev @@ -19,9 +23,12 @@ return nil } -func (c *MongoClient) DeleteEventsUpdatedBefore(t time.Time) error { - res, err := c.eventsCollection.DeleteMany(c.ctx, bson.M{"lastUpdated": bson.M{"$lt": t}}) - log.Println("deleted", res.DeletedCount, "events") +func (c *MongoClient) DeleteEventsUpdatedBefore(cal MongoCal, t time.Time) error { + res, err := c.eventsCollection.DeleteMany( + c.ctx, + bson.M{"calendarUrl": cal.Url, + "lastUpdated": bson.M{"$lt": t}}) + LogWithCal(cal, "deleted", res.DeletedCount, "events updated before", t) if err != nil { return err }