Mercurial > code > home > repos > gcalendarwatch
view gcalendarwatch.py @ 20:8c7af0d1b118
format
author | drewp@bigasterisk.com |
---|---|
date | Thu, 29 Oct 2020 23:48:35 -0700 |
parents | 0af075b62e4a |
children | 9eb6b4806272 |
line wrap: on
line source
""" sync google calendar into mongodb, return queries from that as JSON-LD. gcalendarwatch.conf looks like this: { "minutes_between_polls" : 60 "mongo" : {"host" : "h1", "port" : 27017, "database" : "dbname", "collection" : "pim"}, } This should be updated to use http://googledevelopers.blogspot.com/2013/07/google-calendar-api-push-notifications.html and update faster with less polling """ import datetime import json import re import time import traceback import cyclone.web import pymongo.collection from dateutil.parser import parse from dateutil.tz import tzlocal from patchablegraph import ( CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph, ) from calendar_connection import getCalendarService from typing import Dict, Any from pymongo import MongoClient from rdflib import Graph, Literal, Namespace, RDF, URIRef from standardservice.logsetup import log, verboseLogging from twisted.internet import reactor import docopt from prometheus_client import Summary from prometheus_client.exposition import generate_latest from prometheus_client.registry import REGISTRY UPDATE = Summary('gcalendarwatch_updates', 'update loop calls') EV = Namespace("http://bigasterisk.com/event#") """ example: { 'id': 'l640999999999999999999999c', 'summary': 'sec.......', 'start': {'timeZone': 'America/Los_Angeles', 'dateTime': '2014-09-25T16:00:00-07:00'}, 'end': {'timeZone': 'America/Los_Angeles', 'dateTime': '2014-09-25T17:00:00-07:00'}, 'endTimeUnspecified': True, 'created': '2014-09-08T20:39:00.000Z', 'creator': {'self': True, 'displayName': '...', 'email': '...'}, 'etag': '"2829999999999000"', 'htmlLink': 'https://www.google.com/calendar/event?eid=bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbEBt', 'iCalUID': 'l640998888888888888888888888888888com', 'kind': 'calendar#event', 'organizer': {'self': True, 'displayName': '...', 'email': '...'}, 'reminders': {'useDefault': True}, 'sequence': 0, 'status': 'confirmed', 'updated': '2014-09-17T04:28:56.997Z', }""" def feedFromCalId(conf, calId): return conf['event_uri_ns'] + 'feed/' + calId def recordFromEv(conf: Dict, calId: str, ev: Dict): def dateOrTime(d): if 'date' in d: return d['date'] return d['dateTime'] rec = { 'uri': conf['event_uri_ns'] + ev['id'], 'feed': feedFromCalId(conf, calId), 'title': ev.get('summary', '?'), 'start': dateOrTime(ev['start']), 'end': dateOrTime(ev['end']), 'endTimeUnspecified': ev.get('endTimeUnspecified', False), 'htmlLink': ev.get('htmlLink', ''), 'creatorEmail': ev.get('creator', {}).get('email', ''), } for field, val in [('start', ev['start']), ('end', ev['end'])]: if 'date' in val: rec['%sTime' % field] = parse(val['date']).replace(tzinfo=tzlocal()) rec['%sDate' % field] = val['date'] else: rec['%sTime' % field] = parse(val['dateTime']) rec['%sDate' % field] = parse(val['dateTime']).date().isoformat() return rec def asJsonLd(events): ret = {'@graph': []} # type: Dict[Any, Any] for ev in events: ev['startTime'] = ev['startTime'].astimezone(tzlocal()).isoformat() ev['endTime'] = ev['endTime'].astimezone(tzlocal()).isoformat() ev['@id'] = ev.pop('uri') ret['@graph'].append(ev) ret['@context'] = { "xsd": "http://www.w3.org/2001/XMLSchema#", "ev": "http://bigasterisk.com/event#", "startTime": { "@id": "ev:startTime", "@type": "xsd:dateTime" }, "endTime": { "@id": "ev:endTime", "@type": "xsd:dateTime" }, "startDate": { "@id": "ev:startDate", "@type": "xsd:date" }, "endDate": { "@id": "ev:endDate", "@type": "xsd:date" }, "title": "ev:title", "feed": { "@id": "ev:feed", "@type": "@id" }, "htmlLink": { "@id": "ev:htmlLink", "@type": "@id" }, } return ret def asGraph(events, extraClasses=[]): graph = Graph() graph.namespace_manager.bind('ev', EV) for ev in events: uri = URIRef(ev['uri']) def add(p, o): return graph.add((uri, p, o)) add(RDF.type, EV['Event']) for cls in extraClasses: add(RDF.type, cls) add(EV['title'], Literal(ev['title'])) add(EV['start'], Literal(ev['start'])) add(EV['startDate'], Literal(ev['startDate'])) add(EV['end'], Literal(ev['end'])) add(EV['feed'], URIRef(ev['feed'])) # graph.add((feed, RDFS.label, Literal(ev['feedTitle']))) if 'htmlLink' in ev: add(EV['htmlLink'], URIRef(ev['htmlLink'])) return graph def getFirstPageOfCalendars(service): for row in service.calendarList().list().execute()['items']: yield row['id'] def dayRange(days): now = datetime.datetime.now(tzlocal()) start = now - datetime.timedelta(hours=12) end = now + datetime.timedelta(days=days) return start, end def limitDays(recs, days): start, end = dayRange(days) start = start - datetime.timedelta(hours=12) # incomplete return [r for r in recs if r['startTime'] < end and r['endTime'] > start] def starred(graph, ev): title = graph.value(ev, EV['title']) m = re.search(r'(.*)\*\s*$', title) if m: return m.group(1) else: return None def filterStarred(recs, maxCount=15): recs = sorted(recs, key=lambda r: r['start']) out = [] for rec in recs: if re.search(r'(.*)\*\s*$', rec['title']): out.append(rec) if len(out) >= maxCount: break return out class SyncToMongo(object): """reads gcal, writes to mongodb""" collection: pymongo.collection.Collection def __init__(self, conf, collection: pymongo.collection.Collection, agendaGraph: PatchableGraph, countdownGraph: PatchableGraph): self.conf = conf self.service = getCalendarService() self.collection = collection self.agendaGraph = agendaGraph self.countdownGraph = countdownGraph @UPDATE.time() def update(self, days=10, cal=None) -> int: start, end = dayRange(days) spec = {"startTime": {"$gte": start, "$lte": end}} if cal is not None: spec['feed'] = feedFromCalId(self.conf, cal) idsFormerlyInRange = [doc['_id'] for doc in self.collection.find(spec)] n = self.collection.delete_many(spec) log.info(f'cleared {n} records before reread') totalNew = 0 currentRecords = [] for calId in getFirstPageOfCalendars(self.service): if cal and calId != cal: continue print('read %s' % calId) events = self.service.events().list( calendarId=calId, singleEvents=True, timeMin=start.isoformat(), timeMax=end.isoformat(), showDeleted=False, maxResults=1000, ).execute() for ev in events['items']: rec = recordFromEv(self.conf, calId, ev) self.upsertMongo(rec) if rec['uri'] not in idsFormerlyInRange: totalNew += 1 currentRecords.append(rec) self.updateGraphs(currentRecords) return totalNew def upsertMongo(self, rec): if self.collection.find_one({"_id": rec['uri']}) is not None: log.debug("existing record %s", rec['uri']) # this is not yet noticing updates return [] else: log.debug("add record %s", rec) d = rec.copy() d['_id'] = d.pop('uri') self.collection.insert_one(d) return [rec] def updateGraphs(self, currentRecords): c = EV['gcalendar'] currentRecords = list(currentRecords) self.agendaGraph.setToGraph([(s, p, o, c) for s, p, o in asGraph(limitDays(currentRecords, days=2))]) self.countdownGraph.setToGraph([(s, p, o, c) for s, p, o in asGraph(filterStarred(currentRecords, maxCount=15), extraClasses=[EV['CountdownEvent']])]) class ReadMongoEvents(object): """read events from mongodb""" def __init__(self, collection): self.collection = collection def getEvents(self, t1, t2): for doc in self.collection.find({"startTime": {"$gte": t1, "$lt": t2}}).sort([("startTime", 1)]): doc['uri'] = doc.pop('_id') if 'feedId' in doc: doc['feed'] = URIRef('old_event') yield doc class Poller(object): def __init__(self, sync, periodSec): self.sync = sync self.lastUpdateTime = time.time() self.everUpdated = False self.periodSec = periodSec self.scheduled = reactor.callLater(self.periodSec, self._updateLoop) self.events = None def updateNow(self, cal=None) -> int: self.scheduled.cancel() return self._updateLoop(cal) def _updateLoop(self, cal=None) -> int: log.info(f"updating {cal or 'all'}") t1 = time.time() try: n = self.sync.update(cal=cal) except Exception: traceback.print_exc() log.error("updated failed") n = 0 self.lastUpdateTime = t1 self.everUpdated = True took = time.time() - t1 self.scheduled = reactor.callLater(max(3, self.periodSec - took), self._updateLoop) return n class PollNow(cyclone.web.RequestHandler): def post(self): cal = json.loads(self.request.body).get('cal', None) if self.request.body else None n = self.settings.poller.updateNow(cal) msg = f"found {n} new records" log.info(msg) self.write(msg.encode('utf8')) class Index(cyclone.web.RequestHandler): def get(self): period = self.settings.conf['minutes_between_polls'] * 60 ago = time.time() - self.settings.poller.lastUpdateTime if not self.settings.poller.everUpdated: msg = "no completed updates %d sec after startup" % ago if ago > period * 1.1: raise ValueError(msg) else: msg = "last update was %d sec ago" % ago if ago > period * 1.1: raise ValueError(msg) self.set_header("content-type", "text/html") self.write(open("index.html").read().replace("MSG", msg)) class EventsPage(cyclone.web.RequestHandler): def get(self): """ upcoming events as JSON-LD """ arg = self.get_argument t1 = parse(arg('t1')) if arg('t1', default=None) else datetime.datetime.now().replace(hour=0, minute=0, second=0) t2 = parse(arg('t2')) if arg('t2', default=None) else datetime.datetime.now() + datetime.timedelta(days=int(arg('days')) if arg('days', default=None) else 2) if 0: self.set_header("content-type", "application/ld+json") self.write(asJsonLd(self.settings.read.getEvents(t1, t2))) else: self.set_header("content-type", "text/n3") self.write(asGraph(self.settings.read.getEvents(t1, t2)).serialize(format='n3')) class Countdowns(cyclone.web.RequestHandler): def get(self): rows = [] graph = self.settings.countdownGraph._graph for ev in graph.subjects(RDF.type, EV['Event']): starLabel = starred(graph, ev) if starLabel is not None: rows.append({'@type': 'countdown', 'time': graph.value(ev, EV['start']), 'label': starLabel}) self.set_header("content-type", "application/ld+json") self.write( json.dumps({ "@context": { "countdown": "http://bigasterisk.com/countdown#CountdownEvent", "label": "http://www.w3.org/2000/01/rdf-schema#label", "time": { "@id": "http://bigasterisk.com/event#time", "@type": "xsd:dateTime" }, "xsd": "http://www.w3.org/2001/XMLSchema#", "rdfs": "http://www.w3.org/2000/01/rdf-schema#" }, "@graph": rows, })) class Metrics(cyclone.web.RequestHandler): def get(self): self.add_header('content-type', 'text/plain') self.write(generate_latest(REGISTRY)) def main(): args = docopt.docopt(''' Usage: gcalendarwatch [options] Options: -v, --verbose more logging --now don't wait for first update ''') verboseLogging(args['--verbose']) agendaGraph = PatchableGraph() # next few days countdownGraph = PatchableGraph() # next n of starred events conf = json.load(open("gcalendarwatch.conf")) m = conf['mongo'] mongoOut = MongoClient(m['host'], m['port'], tz_aware=True)[m['database']][m['collection']] sync = SyncToMongo(conf, mongoOut, agendaGraph, countdownGraph) read = ReadMongoEvents(mongoOut) s, e = dayRange(60) sync.updateGraphs(read.getEvents(s, e)) poller = Poller(sync, conf['minutes_between_polls'] * 60) if args['--now']: poller.updateNow() class Application(cyclone.web.Application): def __init__(self): handlers = [ (r"/", Index), (r'/events', EventsPage), (r'/pollNow', PollNow), (r'/graph/calendar/upcoming', CycloneGraphHandler, { 'masterGraph': agendaGraph }), (r'/graph/calendar/upcoming/events', CycloneGraphEventsHandler, { 'masterGraph': agendaGraph }), (r'/graph/calendar/countdown', CycloneGraphHandler, { 'masterGraph': countdownGraph }), (r'/graph/calendar/countdown/events', CycloneGraphEventsHandler, { 'masterGraph': countdownGraph }), (r'/countdowns.json', Countdowns), (r'/metrics', Metrics), ] cyclone.web.Application.__init__( self, handlers, conf=conf, read=read, poller=poller, agendaGraph=agendaGraph, countdownGraph=countdownGraph, ) reactor.listenTCP(conf['serve_port'], Application()) reactor.run() if __name__ == '__main__': main()