Mercurial > code > home > repos > gcalendarwatch
changeset 57:24f662799710
WIP incremental sync now runs
author | drewp@bigasterisk.com |
---|---|
date | Thu, 05 Sep 2024 15:03:05 -0700 |
parents | 635ff76f867c |
children | 6c7151126a0b |
files | calsync/event_sync.go calsync/gcalclient/event_requests.go calsync/gcalclient/gcalclient.go calsync/main.go |
diffstat | 4 files changed, 89 insertions(+), 185 deletions(-) [+] |
line wrap: on
line diff
--- a/calsync/event_sync.go Thu Sep 05 13:50:40 2024 -0700 +++ b/calsync/event_sync.go Thu Sep 05 15:03:05 2024 -0700 @@ -30,46 +30,67 @@ } log.Println("syncing events from", len(cals), "calendars") - readerErr := make(chan error) for _, cal := range cals { rd := newCalEventsReader(mc, gc, cal, syncStart, syncEnd) - go func() { - readerErr <- rd.updateForever() - }() + go rd.updateForever() } - return <-readerErr + return nil // return a mapping of webhook name to calEventsReader? } +// i think this should own the webhook watcher type calEventsReader struct { - mc *mongoclient.MongoClient - gc *gcalclient.GCalClient - cal mongoclient.MongoCal - t1 time.Time - t2 time.Time + mc *mongoclient.MongoClient + gc *gcalclient.GCalClient + cal mongoclient.MongoCal + t1 time.Time + t2 time.Time + syncToken string } func newCalEventsReader(mc *mongoclient.MongoClient, gc *gcalclient.GCalClient, cal mongoclient.MongoCal, t1 time.Time, t2 time.Time) *calEventsReader { - return &calEventsReader{mc, gc, cal, t1, t2} + return &calEventsReader{mc, gc, cal, t1, t2, ""} } -func (r *calEventsReader) updateForever() error { +func (r *calEventsReader) updateForever() { + err := r.updateInitialRange() + if err != nil { + log.Fatal(err) + } - var syncToken string - items, err := r.gc.ListEventsInRange(r.cal, r.t1, r.t2) + for { + log.Println("syncing", r.cal.Summary, "with sync", r.syncToken) + err = r.sync() + if err != nil { + log.Fatal(err) + } + + time.Sleep(10 * time.Second) + } +} + +func (r *calEventsReader) updateInitialRange() error { + events, nextSyncToken, err := r.gc.ListEventsInRange(r.cal, r.t1, r.t2) + r.syncToken = nextSyncToken if err != nil { return err } - for _, item := range items { - r.mc.UpsertOneEvent(convert.MongoEventFromGoogleEvent2(&item.Event, time.Now() /*todo*/)) - syncToken = item.SyncToken + for _, ev := range events { + r.mc.UpsertOneEvent(convert.MongoEventFromGoogleEvent2(&ev, time.Now() /*todo*/)) } - r.mc.DeleteEventsUpdatedBefore(r.cal, r.t1) + return r.mc.DeleteEventsUpdatedBefore(r.cal, r.t1) +} - for _, item := range r.gc.ListEventUpdatesForever(syncToken) { - // r.mc.UpsertOneEvent(convert.MongoEventFromGoogleEvent2(item.ev, time.Now()/*todo*/)) - syncToken = item.SyncToken - _ = syncToken +func (r *calEventsReader) sync() error { + mongoclient.LogWithCal(r.cal, "syncing with", r.syncToken) + events, nextSyncToken, err := r.gc.ListEventUpdates(r.cal, r.syncToken) + if err != nil { + return err } + r.syncToken = nextSyncToken + for _, ev := range events { + r.mc.UpsertOneEvent(convert.MongoEventFromGoogleEvent2(&ev, time.Now() /*todo*/)) + } + return nil }
--- a/calsync/gcalclient/event_requests.go Thu Sep 05 13:50:40 2024 -0700 +++ b/calsync/gcalclient/event_requests.go Thu Sep 05 15:03:05 2024 -0700 @@ -8,8 +8,7 @@ const pageSize = 20 -func rangedEventsCall(srv *calendar.Service, calGoogleId string, - initialFillStart, initialFillEnd time.Time, pageToken string) *calendar.EventsListCall { +func rangedEventsCall(srv *calendar.Service, calGoogleId string, initialFillStart, initialFillEnd time.Time, pageToken string) *calendar.EventsListCall { return srv.Events.List(calGoogleId). ShowDeleted(false). SingleEvents(true). @@ -19,9 +18,11 @@ PageToken(pageToken) } -func syncEventsCall(srv *calendar.Service, calGoogleId string) *calendar.EventsListCall { +func syncEventsCall(srv *calendar.Service, calGoogleId string, syncToken, pageToken string) *calendar.EventsListCall { return srv.Events.List(calGoogleId). ShowDeleted(true). SingleEvents(true). - MaxResults(pageSize) + MaxResults(pageSize). + PageToken(pageToken). + SyncToken(syncToken) }
--- a/calsync/gcalclient/gcalclient.go Thu Sep 05 13:50:40 2024 -0700 +++ b/calsync/gcalclient/gcalclient.go Thu Sep 05 15:03:05 2024 -0700 @@ -76,55 +76,6 @@ list.Items = ret } -// // FindEvents considers all calendars. It spawns routines, does some initial sync, and returns. -// func (gc *GCalClient) FindEvents( -// mc *mongoclient.MongoClient, -// // For each calendar, after events in this time range have been sent to -// // `out`, the chan will get the other kind of FindEventsMessage (CalId, -// // ...). That message signals that the caller may cull old events on the given -// // calendar. After that point, all events will be updates (including -// // deletes). -// initialFillStart, initialFillEnd time.Time, -// out chan *FindEventsMessage, -// ) error { -// cals, err := mc.GetAllCals() -// if err != nil { -// return err -// } -// log.Println("reading", len(cals), "calendars") -// for calNum, cal := range cals { -// go gc.calRoutine(calNum, cal, initialFillStart, initialFillEnd, out) -// } - -// return nil -// } - -// func (gc *GCalClient) calRoutine(calNum int, cal mongoclient.MongoCal, initialFillStart time.Time, initialFillEnd time.Time, out chan *FindEventsMessage) bool { -// t := time.Now() -// log.Println(" cal", calNum, cal.Url) -// log.Println(" cal", calNum, "readEventsInRange", "from", initialFillStart, "to", initialFillEnd) -// syncToken, err := gc.readEventsInRange(&cal, initialFillStart, initialFillEnd, out) -// if err != nil { -// log.Panicln(err) -// return true -// } - -// out <- &FindEventsMessage{nil, cal.GoogleId, t} - -// ew := gc.NewEventWatch(&cal, t, syncToken) - -// for loop := 0; ; loop++ { -// log.Println("") -// log.Println("tail loop", loop, "for", cal.Url) -// err := ew.GetMoreEvents(out) -// if err != nil { -// log.Panicln(err) -// return true -// } -// time.Sleep(10 * time.Minute) -// } - -// } func shortDebugHash(pageToken string) string { if pageToken == "" { @@ -133,55 +84,54 @@ return fmt.Sprintf("%x", md5.Sum([]byte(pageToken))) } -type EventResult struct { - Event CalendarEvent - SyncToken string -} - func (gc *GCalClient) ListEventsInRange(cal mongoclient.MongoCal, t1, t2 time.Time) ( - []EventResult, error) { - ret := make([]EventResult, 0) + events []CalendarEvent, nextSyncToken string, err error) { mongoclient.LogWithCal(cal, "ListEventsInRange", t1, "to", t2) - syncToken := "" - var err error - var events []CalendarEvent - - events, syncToken, err = readEventsPages(gc, cal, t1, t2) + call := func(pageToken string) *calendar.EventsListCall { + return rangedEventsCall(gc.srv, cal.GoogleId, t1, t2, pageToken) + } + events, nextSyncToken, err = readEventsPages(cal, call) if err != nil { - log.Panicln(err) - return ret, err - } - for _, e := range events { - ret = append(ret, EventResult{e, syncToken}) + return nil, "", err } - return ret, nil + return events, nextSyncToken, nil } -func (gc *GCalClient) ListEventUpdatesForever(syncToken string) []EventResult { - for { - time.Sleep(5 * time.Minute) +func (gc *GCalClient) ListEventUpdates(cal mongoclient.MongoCal, syncToken string) ( + events []CalendarEvent, nextSyncToken string, err error) { + mongoclient.LogWithCal(cal, "ListEventUpdates", syncToken) + + call := func(pageToken string) *calendar.EventsListCall { + return syncEventsCall(gc.srv, cal.GoogleId, syncToken, pageToken) + } + events, nextSyncToken, err = readEventsPages(cal, call) + if err != nil { + return nil, "", err } + return events, nextSyncToken, nil } -func readEventsPages(gc *GCalClient, cal mongoclient.MongoCal, t1, t2 time.Time) ( +func readEventsPages(cal mongoclient.MongoCal, call func(string) *calendar.EventsListCall) ( events []CalendarEvent, syncToken string, err error) { events = make([]CalendarEvent, 0) - syncToken = "" err = nil pageToken := "" for { mongoclient.LogWithCal(cal, "getting another page", shortDebugHash(pageToken)) - pageResult, err2 := rangedEventsCall(gc.srv, cal.GoogleId, t1, t2, pageToken).Do() + pageResult, err2 := call(pageToken).Do() if err2 != nil { log.Fatal(err2) return nil, "", err2 } + // Placement is important! This must run even if the result set is empty. + syncToken = pageResult.NextSyncToken + mongoclient.LogWithCal(cal, "got page with", len(pageResult.Items), "events") if len(pageResult.Items) == 0 { break @@ -198,7 +148,8 @@ }) } - syncToken = pageResult.NextSyncToken + + if pageResult.NextPageToken == "" { break } @@ -207,47 +158,3 @@ mongoclient.LogWithCal(cal, "total events read: ", len(events), "with syncToken", syncToken) return events, syncToken, nil } - -type eventWatch struct { - gc *GCalClient - cal *mongoclient.MongoCal - nextSyncToken string - nextPageToken string - modSince time.Time -} - -func (gc *GCalClient) NewEventWatch( - cal *mongoclient.MongoCal, - modSince time.Time, - syncToken string, -) *eventWatch { - ew := &eventWatch{gc, cal, syncToken, "", modSince} - return ew -} - -// Call this when there are likely new changes to sync. -func (w *eventWatch) GetMoreEvents() error { - call := syncEventsCall(w.gc.srv, w.cal.GoogleId) - - log.Println("listing events on", w.cal.GoogleId, "with") - if w.nextPageToken != "" { - call = call.PageToken(w.nextPageToken) - log.Println(" pageToken", w.nextPageToken) - } else if w.nextSyncToken != "" { - call = call.SyncToken(w.nextSyncToken) - log.Println(" syncToken", w.nextSyncToken) - } else { - call = call.UpdatedMin((w.modSince.Format(time.RFC3339))) - log.Println(" updatedMin", w.modSince.Format(time.RFC3339)) - } - events, err := call.Do() - if err != nil { - return err - } - w.nextSyncToken = events.NextSyncToken - w.nextPageToken = events.NextPageToken - log.Println(len(events.Items), "more events received") - // sendEvents(events, w.cal, out) - log.Println("got nextSyncToken=", w.nextSyncToken, " nextPageToken=", w.nextPageToken) - return err -}
--- a/calsync/main.go Thu Sep 05 13:50:40 2024 -0700 +++ b/calsync/main.go Thu Sep 05 15:03:05 2024 -0700 @@ -28,9 +28,6 @@ ) func main() { - _ = http.StripPrefix - _ = mux.NewRouter - ctx := context.Background() log.SetFlags(log.LstdFlags | log.Lshortfile) @@ -47,10 +44,10 @@ defer mc.Close() // todo: if a cal is deleted, nothing touches its db events ever again. - // err = updateMongoCalsToMatchGoogleOnce(mc, gc) - // if err != nil { - // log.Fatal(err) - // } + err = updateMongoCalsToMatchGoogleOnce(mc, gc) + if err != nil { + log.Fatal(err) + } err = updateMongoEventsToMatchGoogleForever(mc, gc, time.Duration(7*24)*time.Hour, @@ -58,41 +55,19 @@ if err != nil { log.Fatal(err) } - log.Fatal("err was nil: updateMongoEventsToMatchGoogleForever shouldn't have returned") - /* - ------------------ - connect to mongodb with these ops: - save cals list - get/set incremental token - add/edit/del events - collection.find({"startTime": {"$gte": t1, "$lt": t2}}).sort([("startTime", 1)]) - collection.find({"startTime": {"$lte": now}, "endTime": {"$gte": now}})) - - connect to https://github.com/googleapis/google-api-go-client/tree/main/calendar/v3 and: - get all my cals - - subscribe to events - - get cal event changes from incremental token + r := mux.NewRouter() + http.Handle("/", r) - get cal events in range, for initial fill? - - write add/edit/del changes to mongo - */ - - // r := mux.NewRouter() - // http.Handle("/", r) + home := func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("calsync service for calendar updates")) + } + r.HandleFunc("/", home) + r.HandleFunc("/gcalendarwatch", home) - // home := func(w http.ResponseWriter, r *http.Request) { - // w.Write([]byte("calsync service for calendar updates")) - // } - // r.HandleFunc("/", home) - // r.HandleFunc("/gcalendarwatch", home) - - // notificationHandler := func(w http.ResponseWriter, r *http.Request) { - // } - // r.HandleFunc("/gcalendarwatch/notification", notificationHandler).Methods("POST") - // log.Println(("serving /gcalendarwatch/notification on :8080")) - // log.Fatal(http.ListenAndServe(":8080", nil)) + notificationHandler := func(w http.ResponseWriter, r *http.Request) { + } + r.HandleFunc("/gcalendarwatch/notification", notificationHandler).Methods("POST") + log.Println(("serving /gcalendarwatch/notification on :8080")) + log.Fatal(http.ListenAndServe(":8080", nil)) }