comparison gcalendarwatch @ 0:e40034f22c69

moved from pimscreen, upgrade to py3. Redo google auth. Ignore-this: 43ada92a0639d288ca76e5486f6fa489
author drewp@bigasterisk.com
date Tue, 25 Jun 2019 17:08:27 -0700
parents
children d77ead665ab2
comparison
equal deleted inserted replaced
-1:000000000000 0:e40034f22c69
1 #!/usr/bin/python3
2
3 """
4 sync google calendar into mongodb, return queries from that as
5 JSON-LD.
6
7 gcalendarwatch.conf looks like this:
8 {
9 "minutes_between_polls" : 60
10 "mongo" : {"host" : "h1", "port" : 27017, "database" : "dbname", "collection" : "pim"},
11 }
12
13 This should be updated to use
14 http://googledevelopers.blogspot.com/2013/07/google-calendar-api-push-notifications.html
15 and update faster with less polling
16 """
17 import json, datetime, time, traceback, re, docopt
18
19 from dateutil.parser import parse
20 from dateutil.tz import tzlocal
21 from googleapiclient import discovery
22 from googleapiclient.http import build_http
23 from pymongo import MongoClient
24 from rdflib import Namespace, Literal, Graph, URIRef, RDF
25 from twisted.internet import reactor
26 import cyclone.web
27 import oauth2client, oauth2client.file
28
29 from standardservice.logsetup import log, verboseLogging
30 from patchablegraph import PatchableGraph, CycloneGraphEventsHandler, CycloneGraphHandler
31
32 EV = Namespace("http://bigasterisk.com/event#")
33
34 """
35 example:
36 {
37 'id': 'l640999999999999999999999c',
38 'summary': 'sec.......',
39 'start': {'timeZone': 'America/Los_Angeles', 'dateTime': '2014-09-25T16:00:00-07:00'},
40 'end': {'timeZone': 'America/Los_Angeles', 'dateTime': '2014-09-25T17:00:00-07:00'},
41 'endTimeUnspecified': True,
42 'created': '2014-09-08T20:39:00.000Z',
43 'creator': {'self': True, 'displayName': '...', 'email': '...'},
44 'etag': '"2829999999999000"',
45 'htmlLink': 'https://www.google.com/calendar/event?eid=bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbEBt',
46 'iCalUID': 'l640998888888888888888888888888888com',
47 'kind': 'calendar#event',
48 'organizer': {'self': True, 'displayName': '...', 'email': '...'},
49 'reminders': {'useDefault': True},
50 'sequence': 0,
51 'status': 'confirmed',
52 'updated': '2014-09-17T04:28:56.997Z',
53 }"""
54 def recordFromEv(conf, calId, ev):
55 def dateOrTime(d):
56 if 'date' in d:
57 return d['date']
58 return d['dateTime']
59 rec = {
60 'uri': conf['event_uri_ns'] + ev['id'],
61 'feed': conf['event_uri_ns'] + 'feed/' + calId,
62 'title': ev['summary'],
63 'start': dateOrTime(ev['start']),
64 'end': dateOrTime(ev['end']),
65 'endTimeUnspecified': ev.get('endTimeUnspecified', False),
66 'htmlLink': ev['htmlLink'],
67 'creatorEmail': ev['creator']['email'],
68 }
69
70 for field, val in [('start', ev['start']),
71 ('end', ev['end'])]:
72 if 'date' in val:
73 rec['%sTime' % field] = parse(val['date']).replace(tzinfo=tzlocal())
74 rec['%sDate' % field] = val['date']
75 else:
76 rec['%sTime' % field] = parse(val['dateTime'])
77 rec['%sDate' % field] = parse(val['dateTime']).date().isoformat()
78 return rec
79
80 def asJsonLd(events):
81 ret = {'@graph':[]}
82 for ev in events:
83 ev['startTime'] = ev['startTime'].astimezone(tzlocal()).isoformat()
84 ev['endTime'] = ev['endTime'].astimezone(tzlocal()).isoformat()
85 ev['@id'] = ev.pop('uri')
86 ret['@graph'].append(ev)
87
88 ret['@context'] = {
89 "xsd": "http://www.w3.org/2001/XMLSchema#",
90 "ev": "http://bigasterisk.com/event#",
91 "startTime": {"@id": "ev:startTime", "@type": "xsd:dateTime"},
92 "endTime": {"@id": "ev:endTime", "@type": "xsd:dateTime"},
93 "startDate": {"@id": "ev:startDate", "@type": "xsd:date"},
94 "endDate": {"@id": "ev:endDate", "@type": "xsd:date"},
95 "title" : "ev:title",
96 "feed": {"@id": "ev:feed", "@type": "@id"},
97 "htmlLink": {"@id": "ev:htmlLink", "@type": "@id"},
98 }
99 return ret
100
101 def asN3(events, conf):
102 return asGraph(events, conf).serialize(format='n3')
103
104 def asGraph(events, conf):
105 graph = Graph()
106 graph.namespace_manager.bind('ev', EV)
107 for ev in events:
108 uri = URIRef(ev['uri'])
109 add = lambda p, o: graph.add((uri, p, o))
110 add(RDF.type, EV['Event'])
111 add(EV['title'], Literal(ev['title']))
112 add(EV['start'], Literal(ev['start']))
113 add(EV['startDate'], Literal(ev['startDate']))
114 add(EV['end'], Literal(ev['end']))
115 add(EV['feed'], URIRef(ev['feed']))
116 #graph.add((feed, RDFS.label, Literal(ev['feedTitle'])))
117 if 'htmlLink' in ev:
118 add(EV['htmlLink'], URIRef(ev['htmlLink']))
119 return graph
120
121
122
123 def getCalendarService(client_secrets='client_secret.json',
124 credential_storage='calemndar.dat',
125 scope='https://www.googleapis.com/auth/calendar.readonly',
126 name='calendar',
127 version='v3'):
128 """
129 see
130 https://cloud.google.com/docs/authentication/end-user#creating_your_client_credentials
131 for getting client_secret.json . Use 'application type' of
132 'other'.
133 """
134 flow = oauth2client.client.flow_from_clientsecrets(client_secrets, scope=scope)
135
136 storage = oauth2client.file.Storage(credential_storage)
137 credentials = storage.get()
138 if credentials is None or credentials.invalid:
139 class Flags:
140 logging_level = 'INFO'
141 noauth_local_webserver = True
142 credentials = oauth2client.tools.run_flow(flow, storage, Flags)
143 # (storage now writes back to calendar.dat)
144 http = credentials.authorize(http=build_http())
145
146 service = discovery.build(name, version, http=http)
147 return service
148
149 def getFirstPageOfCalendars(service):
150 for row in service.calendarList().list().execute()['items']:
151 yield row['id']
152
153 def dayRange(days):
154 now = datetime.datetime.now(tzlocal())
155 start = now
156 end = now + datetime.timedelta(days=days)
157 return start, end
158
159 def limitDays(recs, days):
160 start, end = dayRange(days)
161 start = start - datetime.timedelta(hours=12)
162 # incomplete
163 return [r for r in recs if r['startTime'] < end and r['endTime'] > start]
164
165 def starred(graph, ev):
166 title = graph.value(ev, EV['title'])
167 m = re.search(r'(.*)\*\s*$', title)
168 if m:
169 return m.group(1)
170 else:
171 return None
172
173 def filterStarred(recs, maxCount=15):
174 recs = sorted(recs, key=lambda r: r['start'])
175 out = []
176 for rec in recs:
177 if re.search(r'(.*)\*\s*$', rec['title']):
178 out.append(rec)
179 if len(out) >= maxCount:
180 break
181 return out
182
183 class SyncToMongo(object):
184 """reads gcal, writes to mongodb"""
185 def __init__(self, conf, collection, agendaGraph, countdownGraph):
186 self.conf = conf
187 self.service = getCalendarService()
188 self.collection = collection
189 self.agendaGraph = agendaGraph
190 self.countdownGraph = countdownGraph
191
192 def update(self, days=30*6):
193 start, end = dayRange(days)
194 self.removeEntries(start, end)
195
196 currentRecords = []
197 for calId in getFirstPageOfCalendars(self.service):
198 print('read %s' % calId)
199 events = self.service.events().list(
200 calendarId=calId,
201 singleEvents=True,
202 timeMin=start.isoformat(),
203 timeMax=end.isoformat(),
204 showDeleted=False,
205 maxResults=1000,
206 ).execute()
207
208 for ev in events['items']:
209 rec = recordFromEv(self.conf, calId, ev)
210 self.upsertMongo(rec)
211 currentRecords.append(rec)
212
213 self.updateGraphs(currentRecords)
214
215 def removeEntries(self, start, end):
216 for doc in list(self.collection.find({"startTime":{"$gte":start, "$lte":end}})):
217 self.collection.remove({'_id':doc['_id']})
218
219 def upsertMongo(self, rec):
220 if self.collection.find_one({"_id" : rec['uri']}) is not None:
221 log.debug("existing record %s", rec['uri'])
222 # this is not yet noticing updates
223 return []
224 else:
225 log.debug("new records %s", rec)
226 d = rec.copy()
227 d['_id'] = d.pop('uri')
228 self.collection.insert(d)
229 return [rec]
230
231 def updateGraphs(self, currentRecords):
232 c = EV['gcalendar']
233 currentRecords = list(currentRecords)
234 self.agendaGraph.setToGraph(
235 [(s,p,o,c) for s,p,o in asGraph(limitDays(currentRecords, days=2), self.conf)])
236 self.countdownGraph.setToGraph(
237 [(s,p,o,c) for s,p,o in asGraph(filterStarred(currentRecords, maxCount=15), self.conf)])
238
239
240 class ReadMongoEvents(object):
241 """read events from mongodb"""
242 def __init__(self, collection):
243 self.collection = collection
244
245 def getEvents(self, t1, t2):
246 for doc in self.collection.find({"startTime" : {"$gte": t1, "$lt":t2}
247 }).sort([("startTime",1)]):
248 doc['uri'] = doc.pop('_id')
249 if 'feedId' in doc:
250 doc['feed'] = URIRef('old_event')
251 yield doc
252
253
254
255 class Poller(object):
256 def __init__(self, sync, periodSec):
257 self.sync = sync
258 self.lastUpdateTime = time.time()
259 self.everUpdated = False
260 self.periodSec = periodSec
261 self.scheduled = reactor.callLater(self.periodSec, self._updateLoop)
262 self.events = None
263
264 def updateNow(self):
265 self.scheduled.cancel()
266 self._updateLoop()
267
268 def _updateLoop(self):
269 log.info("updating")
270 t1 = time.time()
271 try:
272 self.sync.update()
273 except Exception:
274 traceback.print_exc()
275 log.error("updated failed")
276 self.lastUpdateTime = t1
277 self.everUpdated = True
278 took = time.time() - t1
279 self.scheduled = reactor.callLater(max(3, self.periodSec - took),
280 self._updateLoop)
281
282 class PollNow(cyclone.web.RequestHandler):
283 def post(self):
284 self.settings.poller.updateNow()
285 self.set_status(202)
286
287 class Index(cyclone.web.RequestHandler):
288 def get(self):
289 period = self.settings.conf['minutes_between_polls'] * 60
290 ago = time.time() - self.settings.poller.lastUpdateTime
291 if not self.settings.poller.everUpdated:
292 msg = "no completed updates %d sec after startup" % ago
293 if ago > period * 1.1:
294 raise ValueError(msg)
295 else:
296 msg = "last update was %d sec ago" % ago
297 if ago > period * 1.1:
298 raise ValueError(msg)
299 self.set_header("content-type", "text/html")
300 self.write(open("gcalendarwatch.html").read().replace("MSG", msg))
301
302
303 class EventsPage(cyclone.web.RequestHandler):
304 def get(self):
305 """
306 upcoming events as JSON-LD
307 """
308 arg = self.get_argument
309 t1 = parse(arg('t1')) if arg('t1', default=None) else datetime.datetime.now().replace(hour=0, minute=0, second=0)
310 t2 = parse(arg('t2')) if arg('t2', default=None) else datetime.datetime.now() + datetime.timedelta(days=int(arg('days')) if arg('days', default=None) else 2)
311 if 0:
312 self.set_header("content-type", "application/ld+json")
313 self.write(asJsonLd(self.settings.read.getEvents(t1, t2)))
314 else:
315 self.set_header("content-type", "text/n3")
316 self.write(asN3(self.settings.read.getEvents(t1, t2), self.settings.conf))
317
318
319 class Countdowns(cyclone.web.RequestHandler):
320 def get(self):
321 rows = []
322 graph = self.settings.countdownGraph._graph
323 for ev in graph.subjects(RDF.type, EV['Event']):
324 starLabel = starred(graph, ev)
325 if starLabel is not None:
326 rows.append({'@type':'countdown', 'time': graph.value(ev, EV['start']), 'label': starLabel})
327 self.set_header("content-type", "application/ld+json")
328 self.write(json.dumps({
329 "@context": {
330 "countdown":"http://bigasterisk.com/countdown#CountdownEvent",
331 "label": "http://www.w3.org/2000/01/rdf-schema#label",
332 "time": {
333 "@id": "http://bigasterisk.com/event#time",
334 "@type": "xsd:dateTime"
335 },
336 "xsd": "http://www.w3.org/2001/XMLSchema#",
337 "rdfs": "http://www.w3.org/2000/01/rdf-schema#"
338 },
339 "@graph": rows,
340 }))
341
342
343 def main():
344 args = docopt.docopt('''
345 Usage:
346 gcalendarwatch [options]
347
348 Options:
349 -v, --verbose more logging
350 ''')
351
352 verboseLogging(args['--verbose'])
353
354 agendaGraph = PatchableGraph() # next few days
355 countdownGraph = PatchableGraph() # next n of starred events
356 conf = json.load(open("gcalendarwatch.conf"))
357 m = conf['mongo']
358 mongoOut = MongoClient(m['host'], m['port'],
359 tz_aware=True)[m['database']][m['collection']]
360 sync = SyncToMongo(conf, mongoOut, agendaGraph, countdownGraph)
361 read = ReadMongoEvents(mongoOut)
362
363 sync.updateGraphs(read.getEvents(
364 datetime.datetime.now().replace(hour=0, minute=0, second=0),
365 datetime.datetime.now() + datetime.timedelta(days=60)))
366
367 poller = Poller(sync, conf['minutes_between_polls'] * 60)
368
369 class Application(cyclone.web.Application):
370 def __init__(self):
371 handlers = [
372 (r"/", Index),
373 (r'/events', EventsPage),
374 (r'/pollNow', PollNow),
375 (r'/graph', CycloneGraphHandler, {'masterGraph': agendaGraph}),
376 (r'/graph/events', CycloneGraphEventsHandler, {'masterGraph': agendaGraph}),
377 (r'/countdownGraph', CycloneGraphHandler, {'masterGraph': countdownGraph}),
378 (r'/countdownGraph/events', CycloneGraphEventsHandler, {'masterGraph': countdownGraph}),
379 (r'/countdowns.json', Countdowns),
380 ]
381 cyclone.web.Application.__init__(self, handlers,
382 conf=conf,
383 read=read,
384 poller=poller,
385 agendaGraph=agendaGraph,
386 countdownGraph=countdownGraph,
387 )
388 reactor.listenTCP(conf['serve_port'], Application())
389 reactor.run()
390
391 if __name__ == '__main__':
392 main()