Mercurial > code > home > repos > gcalendarwatch
annotate calsync/sync_event.go @ 81:79320eff10f2
support status=="cancelled" to delete events
author | drewp@bigasterisk.com |
---|---|
date | Fri, 06 Sep 2024 18:26:13 -0700 |
parents | e8164bd2f9a1 |
children |
rev | line source |
---|---|
49
2991c1166852
start calsync in go. Calendar list seems to sync
drewp@bigasterisk.com
parents:
diff
changeset
|
1 package main |
2991c1166852
start calsync in go. Calendar list seems to sync
drewp@bigasterisk.com
parents:
diff
changeset
|
2 |
2991c1166852
start calsync in go. Calendar list seems to sync
drewp@bigasterisk.com
parents:
diff
changeset
|
3 import ( |
77 | 4 "fmt" |
49
2991c1166852
start calsync in go. Calendar list seems to sync
drewp@bigasterisk.com
parents:
diff
changeset
|
5 "log" |
61
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
6 "math/rand" |
49
2991c1166852
start calsync in go. Calendar list seems to sync
drewp@bigasterisk.com
parents:
diff
changeset
|
7 "time" |
2991c1166852
start calsync in go. Calendar list seems to sync
drewp@bigasterisk.com
parents:
diff
changeset
|
8 |
2991c1166852
start calsync in go. Calendar list seems to sync
drewp@bigasterisk.com
parents:
diff
changeset
|
9 "bigasterisk.com/go/gcalendarwatch/convert" |
2991c1166852
start calsync in go. Calendar list seems to sync
drewp@bigasterisk.com
parents:
diff
changeset
|
10 "bigasterisk.com/go/gcalendarwatch/gcalclient" |
2991c1166852
start calsync in go. Calendar list seems to sync
drewp@bigasterisk.com
parents:
diff
changeset
|
11 "bigasterisk.com/go/gcalendarwatch/mongoclient" |
58 | 12 M "bigasterisk.com/go/gcalendarwatch/mongoclienttypes" |
61
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
13 "bigasterisk.com/go/gcalendarwatch/notificationrouter" |
49
2991c1166852
start calsync in go. Calendar list seems to sync
drewp@bigasterisk.com
parents:
diff
changeset
|
14 ) |
2991c1166852
start calsync in go. Calendar list seems to sync
drewp@bigasterisk.com
parents:
diff
changeset
|
15 |
61
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
16 // Time that we'll still accept the old watchId after we've registered a new one. |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
17 const grace = 1 * time.Minute |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
18 |
80 | 19 // How often to renew watches (roughly). |
20 const watchLifetime = time.Duration(120) * time.Minute | |
21 | |
56
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
22 // Each calendar syncs like this: |
61
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
23 // 1. Full sync of events taking place between `now-initialSyncBack` to `now+initialSyncAhead`. |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
24 // 2. Garbage-collect all events last-modified before `now-initialSyncBack` |
56
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
25 // 3. Continuous watch of each calendar to catch updates. |
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
26 func updateMongoEventsToMatchGoogleForever( |
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
27 mc *mongoclient.MongoClient, |
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
28 gc *gcalclient.GCalClient, |
61
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
29 initialSyncBack time.Duration, |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
30 initialSyncAhead time.Duration, |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
31 router *notificationrouter.NotificationRouter, |
80 | 32 startupJitter time.Duration, |
61
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
33 ) error { |
60 | 34 log.Println("starting updateMongoEventsToMatchGoogleForever") |
56
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
35 |
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
36 now := time.Now() |
61
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
37 initialSyncT1 := now.Add(-initialSyncBack) |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
38 initialSyncT2 := now.Add(initialSyncAhead) |
56
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
39 // Note that we could receive updates outside this interval. |
52 | 40 |
56
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
41 cals, err := mc.GetAllCals() |
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
42 if err != nil { |
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
43 return err |
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
44 } |
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
45 log.Println("syncing events from", len(cals), "calendars") |
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
46 |
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
47 for _, cal := range cals { |
61
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
48 rd := newCalEventsReader(mc, gc, router, cal, initialSyncT1, initialSyncT2) |
77 | 49 go func() { |
80 | 50 err := rd.watchForUpdates(startupJitter) |
77 | 51 if err != nil { |
52 log.Println("ERROR", M.Prefix(rd.cal), "watchForUpdates: ", err, "(aborting this calendar)") | |
53 } | |
54 }() | |
56
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
55 } |
60 | 56 return nil |
56
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
57 } |
52 | 58 |
56
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
59 type calEventsReader struct { |
57 | 60 mc *mongoclient.MongoClient |
61 gc *gcalclient.GCalClient | |
61
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
62 router *notificationrouter.NotificationRouter |
58 | 63 cal M.MongoCal |
64 t1, t2 time.Time | |
57 | 65 syncToken string |
56
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
66 } |
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
67 |
61
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
68 func newCalEventsReader( |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
69 mc *mongoclient.MongoClient, |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
70 gc *gcalclient.GCalClient, |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
71 router *notificationrouter.NotificationRouter, |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
72 cal M.MongoCal, |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
73 t1, t2 time.Time, |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
74 ) *calEventsReader { |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
75 return &calEventsReader{mc, gc, router, cal, t1, t2, ""} |
56
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
76 } |
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
77 |
80 | 78 func (r *calEventsReader) watchForUpdates(startupJitter time.Duration) error { |
79 r.watchForUpdatesJitteredStartup(startupJitter) | |
61
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
80 |
57 | 81 err := r.updateInitialRange() |
82 if err != nil { | |
77 | 83 return err |
57 | 84 } |
56
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
85 |
61
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
86 syncFunc := notificationrouter.SyncFunc(func() { |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
87 err := r.sync() |
57 | 88 if err != nil { |
77 | 89 log.Fatalln("ERROR", M.Prefix(r.cal), "sync: ", err) |
57 | 90 } |
61
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
91 }) |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
92 go r.renewWatchEventsForever(watchLifetime, syncFunc) |
77 | 93 return nil |
61
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
94 } |
57 | 95 |
80 | 96 func (r *calEventsReader) watchForUpdatesJitteredStartup(startupJitter time.Duration) { |
97 jitteredStartup := time.Duration(startupJitter.Seconds()*rand.Float64()) * time.Second | |
75 | 98 log.Println(M.Prefix(r.cal), "watchForEvents starting in", jitteredStartup) |
99 time.Sleep(jitteredStartup) | |
100 } | |
101 | |
61
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
102 func (r *calEventsReader) renewWatchEventsForever(lifetime time.Duration, syncFunc notificationrouter.SyncFunc) { |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
103 log.Println(M.Prefix(r.cal), "renewWatchEventsForever") |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
104 |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
105 for { |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
106 watchId := notificationrouter.NewWatchId() |
75 | 107 jitterScale := 0.5 + 1.0*rand.Float64() |
61
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
108 jitteredLifetime := time.Duration(lifetime.Seconds()*jitterScale) * time.Second |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
109 |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
110 r.router.AddHandler(watchId, syncFunc, jitteredLifetime+grace) |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
111 |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
112 err := r.gc.WatchEvents(&r.cal, watchId, jitteredLifetime+grace) |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
113 if err != nil { |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
114 log.Println("ERROR", M.Prefix(r.cal), "can't sync this cal: ", err) |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
115 return |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
116 } |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
117 time.Sleep(jitteredLifetime) |
8aee4f5c4bdd
receive notifications and route them to calendar sync functions
drewp@bigasterisk.com
parents:
60
diff
changeset
|
118 log.Println(M.Prefix(r.cal), "watch id=", watchId, "is expiring") |
57 | 119 } |
120 } | |
121 | |
122 func (r *calEventsReader) updateInitialRange() error { | |
123 events, nextSyncToken, err := r.gc.ListEventsInRange(r.cal, r.t1, r.t2) | |
56
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
124 if err != nil { |
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
125 return err |
49
2991c1166852
start calsync in go. Calendar list seems to sync
drewp@bigasterisk.com
parents:
diff
changeset
|
126 } |
78 | 127 r.syncToken = nextSyncToken |
81
79320eff10f2
support status=="cancelled" to delete events
drewp@bigasterisk.com
parents:
80
diff
changeset
|
128 err = r.upsertAndDeleteEvents(events) |
78 | 129 if err != nil { |
130 return err | |
56
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
131 } |
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
132 |
57 | 133 return r.mc.DeleteEventsUpdatedBefore(r.cal, r.t1) |
134 } | |
56
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
135 |
57 | 136 func (r *calEventsReader) sync() error { |
137 events, nextSyncToken, err := r.gc.ListEventUpdates(r.cal, r.syncToken) | |
138 if err != nil { | |
77 | 139 return fmt.Errorf("ListEventUpdates: %v", err) |
56
635ff76f867c
WIP: rewrite: process load+sync in parallel between cals; simplify a lot
drewp@bigasterisk.com
parents:
55
diff
changeset
|
140 } |
57 | 141 r.syncToken = nextSyncToken |
81
79320eff10f2
support status=="cancelled" to delete events
drewp@bigasterisk.com
parents:
80
diff
changeset
|
142 err = r.upsertAndDeleteEvents(events) |
78 | 143 if err != nil { |
81
79320eff10f2
support status=="cancelled" to delete events
drewp@bigasterisk.com
parents:
80
diff
changeset
|
144 return fmt.Errorf("upsertAndDeleteEvents: %v", err) |
57 | 145 } |
146 | |
49
2991c1166852
start calsync in go. Calendar list seems to sync
drewp@bigasterisk.com
parents:
diff
changeset
|
147 return nil |
2991c1166852
start calsync in go. Calendar list seems to sync
drewp@bigasterisk.com
parents:
diff
changeset
|
148 } |
78 | 149 |
81
79320eff10f2
support status=="cancelled" to delete events
drewp@bigasterisk.com
parents:
80
diff
changeset
|
150 func (r *calEventsReader) upsertAndDeleteEvents(events []gcalclient.CalendarEvent) error { |
78 | 151 for _, ev := range events { |
81
79320eff10f2
support status=="cancelled" to delete events
drewp@bigasterisk.com
parents:
80
diff
changeset
|
152 if ev.Status == "cancelled" { |
79320eff10f2
support status=="cancelled" to delete events
drewp@bigasterisk.com
parents:
80
diff
changeset
|
153 err := r.mc.DeleteEvent(ev.EventUrl) |
79320eff10f2
support status=="cancelled" to delete events
drewp@bigasterisk.com
parents:
80
diff
changeset
|
154 if err != nil { |
79320eff10f2
support status=="cancelled" to delete events
drewp@bigasterisk.com
parents:
80
diff
changeset
|
155 return err |
79320eff10f2
support status=="cancelled" to delete events
drewp@bigasterisk.com
parents:
80
diff
changeset
|
156 } |
79320eff10f2
support status=="cancelled" to delete events
drewp@bigasterisk.com
parents:
80
diff
changeset
|
157 continue |
79320eff10f2
support status=="cancelled" to delete events
drewp@bigasterisk.com
parents:
80
diff
changeset
|
158 } |
78 | 159 err := r.mc.UpsertOneEvent(convert.MongoEventFromGoogleEvent(&ev, time.Now() /*todo*/)) |
160 if err != nil { | |
161 return err | |
162 } | |
163 } | |
164 return nil | |
165 } |