Mercurial > code > home > repos > gcalendarwatch
diff gcalendarwatch @ 0:e40034f22c69
moved from pimscreen, upgrade to py3. Redo google auth.
Ignore-this: 43ada92a0639d288ca76e5486f6fa489
author | drewp@bigasterisk.com |
---|---|
date | Tue, 25 Jun 2019 17:08:27 -0700 |
parents | |
children | d77ead665ab2 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gcalendarwatch Tue Jun 25 17:08:27 2019 -0700 @@ -0,0 +1,392 @@ +#!/usr/bin/python3 + +""" +sync google calendar into mongodb, return queries from that as +JSON-LD. + +gcalendarwatch.conf looks like this: +{ + "minutes_between_polls" : 60 + "mongo" : {"host" : "h1", "port" : 27017, "database" : "dbname", "collection" : "pim"}, +} + +This should be updated to use +http://googledevelopers.blogspot.com/2013/07/google-calendar-api-push-notifications.html +and update faster with less polling +""" +import json, datetime, time, traceback, re, docopt + +from dateutil.parser import parse +from dateutil.tz import tzlocal +from googleapiclient import discovery +from googleapiclient.http import build_http +from pymongo import MongoClient +from rdflib import Namespace, Literal, Graph, URIRef, RDF +from twisted.internet import reactor +import cyclone.web +import oauth2client, oauth2client.file + +from standardservice.logsetup import log, verboseLogging +from patchablegraph import PatchableGraph, CycloneGraphEventsHandler, CycloneGraphHandler + +EV = Namespace("http://bigasterisk.com/event#") + +""" +example: +{ + 'id': 'l640999999999999999999999c', + 'summary': 'sec.......', + 'start': {'timeZone': 'America/Los_Angeles', 'dateTime': '2014-09-25T16:00:00-07:00'}, + 'end': {'timeZone': 'America/Los_Angeles', 'dateTime': '2014-09-25T17:00:00-07:00'}, + 'endTimeUnspecified': True, + 'created': '2014-09-08T20:39:00.000Z', + 'creator': {'self': True, 'displayName': '...', 'email': '...'}, + 'etag': '"2829999999999000"', + 'htmlLink': 'https://www.google.com/calendar/event?eid=bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbEBt', + 'iCalUID': 'l640998888888888888888888888888888com', + 'kind': 'calendar#event', + 'organizer': {'self': True, 'displayName': '...', 'email': '...'}, + 'reminders': {'useDefault': True}, + 'sequence': 0, + 'status': 'confirmed', + 'updated': '2014-09-17T04:28:56.997Z', +}""" +def recordFromEv(conf, calId, ev): + def dateOrTime(d): + if 'date' in d: + return d['date'] + return d['dateTime'] + rec = { + 'uri': conf['event_uri_ns'] + ev['id'], + 'feed': conf['event_uri_ns'] + 'feed/' + calId, + 'title': ev['summary'], + 'start': dateOrTime(ev['start']), + 'end': dateOrTime(ev['end']), + 'endTimeUnspecified': ev.get('endTimeUnspecified', False), + 'htmlLink': ev['htmlLink'], + 'creatorEmail': ev['creator']['email'], + } + + for field, val in [('start', ev['start']), + ('end', ev['end'])]: + if 'date' in val: + rec['%sTime' % field] = parse(val['date']).replace(tzinfo=tzlocal()) + rec['%sDate' % field] = val['date'] + else: + rec['%sTime' % field] = parse(val['dateTime']) + rec['%sDate' % field] = parse(val['dateTime']).date().isoformat() + return rec + +def asJsonLd(events): + ret = {'@graph':[]} + for ev in events: + ev['startTime'] = ev['startTime'].astimezone(tzlocal()).isoformat() + ev['endTime'] = ev['endTime'].astimezone(tzlocal()).isoformat() + ev['@id'] = ev.pop('uri') + ret['@graph'].append(ev) + + ret['@context'] = { + "xsd": "http://www.w3.org/2001/XMLSchema#", + "ev": "http://bigasterisk.com/event#", + "startTime": {"@id": "ev:startTime", "@type": "xsd:dateTime"}, + "endTime": {"@id": "ev:endTime", "@type": "xsd:dateTime"}, + "startDate": {"@id": "ev:startDate", "@type": "xsd:date"}, + "endDate": {"@id": "ev:endDate", "@type": "xsd:date"}, + "title" : "ev:title", + "feed": {"@id": "ev:feed", "@type": "@id"}, + "htmlLink": {"@id": "ev:htmlLink", "@type": "@id"}, + } + return ret + +def asN3(events, conf): + return asGraph(events, conf).serialize(format='n3') + +def asGraph(events, conf): + graph = Graph() + graph.namespace_manager.bind('ev', EV) + for ev in events: + uri = URIRef(ev['uri']) + add = lambda p, o: graph.add((uri, p, o)) + add(RDF.type, EV['Event']) + add(EV['title'], Literal(ev['title'])) + add(EV['start'], Literal(ev['start'])) + add(EV['startDate'], Literal(ev['startDate'])) + add(EV['end'], Literal(ev['end'])) + add(EV['feed'], URIRef(ev['feed'])) + #graph.add((feed, RDFS.label, Literal(ev['feedTitle']))) + if 'htmlLink' in ev: + add(EV['htmlLink'], URIRef(ev['htmlLink'])) + return graph + + + +def getCalendarService(client_secrets='client_secret.json', + credential_storage='calemndar.dat', + scope='https://www.googleapis.com/auth/calendar.readonly', + name='calendar', + version='v3'): + """ + see + https://cloud.google.com/docs/authentication/end-user#creating_your_client_credentials + for getting client_secret.json . Use 'application type' of + 'other'. + """ + flow = oauth2client.client.flow_from_clientsecrets(client_secrets, scope=scope) + + storage = oauth2client.file.Storage(credential_storage) + credentials = storage.get() + if credentials is None or credentials.invalid: + class Flags: + logging_level = 'INFO' + noauth_local_webserver = True + credentials = oauth2client.tools.run_flow(flow, storage, Flags) + # (storage now writes back to calendar.dat) + http = credentials.authorize(http=build_http()) + + service = discovery.build(name, version, http=http) + return service + +def getFirstPageOfCalendars(service): + for row in service.calendarList().list().execute()['items']: + yield row['id'] + +def dayRange(days): + now = datetime.datetime.now(tzlocal()) + start = now + end = now + datetime.timedelta(days=days) + return start, end + +def limitDays(recs, days): + start, end = dayRange(days) + start = start - datetime.timedelta(hours=12) + # incomplete + return [r for r in recs if r['startTime'] < end and r['endTime'] > start] + +def starred(graph, ev): + title = graph.value(ev, EV['title']) + m = re.search(r'(.*)\*\s*$', title) + if m: + return m.group(1) + else: + return None + +def filterStarred(recs, maxCount=15): + recs = sorted(recs, key=lambda r: r['start']) + out = [] + for rec in recs: + if re.search(r'(.*)\*\s*$', rec['title']): + out.append(rec) + if len(out) >= maxCount: + break + return out + +class SyncToMongo(object): + """reads gcal, writes to mongodb""" + def __init__(self, conf, collection, agendaGraph, countdownGraph): + self.conf = conf + self.service = getCalendarService() + self.collection = collection + self.agendaGraph = agendaGraph + self.countdownGraph = countdownGraph + + def update(self, days=30*6): + start, end = dayRange(days) + self.removeEntries(start, end) + + currentRecords = [] + for calId in getFirstPageOfCalendars(self.service): + print('read %s' % calId) + events = self.service.events().list( + calendarId=calId, + singleEvents=True, + timeMin=start.isoformat(), + timeMax=end.isoformat(), + showDeleted=False, + maxResults=1000, + ).execute() + + for ev in events['items']: + rec = recordFromEv(self.conf, calId, ev) + self.upsertMongo(rec) + currentRecords.append(rec) + + self.updateGraphs(currentRecords) + + def removeEntries(self, start, end): + for doc in list(self.collection.find({"startTime":{"$gte":start, "$lte":end}})): + self.collection.remove({'_id':doc['_id']}) + + def upsertMongo(self, rec): + if self.collection.find_one({"_id" : rec['uri']}) is not None: + log.debug("existing record %s", rec['uri']) + # this is not yet noticing updates + return [] + else: + log.debug("new records %s", rec) + d = rec.copy() + d['_id'] = d.pop('uri') + self.collection.insert(d) + return [rec] + + def updateGraphs(self, currentRecords): + c = EV['gcalendar'] + currentRecords = list(currentRecords) + self.agendaGraph.setToGraph( + [(s,p,o,c) for s,p,o in asGraph(limitDays(currentRecords, days=2), self.conf)]) + self.countdownGraph.setToGraph( + [(s,p,o,c) for s,p,o in asGraph(filterStarred(currentRecords, maxCount=15), self.conf)]) + + +class ReadMongoEvents(object): + """read events from mongodb""" + def __init__(self, collection): + self.collection = collection + + def getEvents(self, t1, t2): + for doc in self.collection.find({"startTime" : {"$gte": t1, "$lt":t2} + }).sort([("startTime",1)]): + doc['uri'] = doc.pop('_id') + if 'feedId' in doc: + doc['feed'] = URIRef('old_event') + yield doc + + + +class Poller(object): + def __init__(self, sync, periodSec): + self.sync = sync + self.lastUpdateTime = time.time() + self.everUpdated = False + self.periodSec = periodSec + self.scheduled = reactor.callLater(self.periodSec, self._updateLoop) + self.events = None + + def updateNow(self): + self.scheduled.cancel() + self._updateLoop() + + def _updateLoop(self): + log.info("updating") + t1 = time.time() + try: + self.sync.update() + except Exception: + traceback.print_exc() + log.error("updated failed") + self.lastUpdateTime = t1 + self.everUpdated = True + took = time.time() - t1 + self.scheduled = reactor.callLater(max(3, self.periodSec - took), + self._updateLoop) + +class PollNow(cyclone.web.RequestHandler): + def post(self): + self.settings.poller.updateNow() + self.set_status(202) + +class Index(cyclone.web.RequestHandler): + def get(self): + period = self.settings.conf['minutes_between_polls'] * 60 + ago = time.time() - self.settings.poller.lastUpdateTime + if not self.settings.poller.everUpdated: + msg = "no completed updates %d sec after startup" % ago + if ago > period * 1.1: + raise ValueError(msg) + else: + msg = "last update was %d sec ago" % ago + if ago > period * 1.1: + raise ValueError(msg) + self.set_header("content-type", "text/html") + self.write(open("gcalendarwatch.html").read().replace("MSG", msg)) + + +class EventsPage(cyclone.web.RequestHandler): + def get(self): + """ + upcoming events as JSON-LD + """ + arg = self.get_argument + t1 = parse(arg('t1')) if arg('t1', default=None) else datetime.datetime.now().replace(hour=0, minute=0, second=0) + t2 = parse(arg('t2')) if arg('t2', default=None) else datetime.datetime.now() + datetime.timedelta(days=int(arg('days')) if arg('days', default=None) else 2) + if 0: + self.set_header("content-type", "application/ld+json") + self.write(asJsonLd(self.settings.read.getEvents(t1, t2))) + else: + self.set_header("content-type", "text/n3") + self.write(asN3(self.settings.read.getEvents(t1, t2), self.settings.conf)) + + +class Countdowns(cyclone.web.RequestHandler): + def get(self): + rows = [] + graph = self.settings.countdownGraph._graph + for ev in graph.subjects(RDF.type, EV['Event']): + starLabel = starred(graph, ev) + if starLabel is not None: + rows.append({'@type':'countdown', 'time': graph.value(ev, EV['start']), 'label': starLabel}) + self.set_header("content-type", "application/ld+json") + self.write(json.dumps({ + "@context": { + "countdown":"http://bigasterisk.com/countdown#CountdownEvent", + "label": "http://www.w3.org/2000/01/rdf-schema#label", + "time": { + "@id": "http://bigasterisk.com/event#time", + "@type": "xsd:dateTime" + }, + "xsd": "http://www.w3.org/2001/XMLSchema#", + "rdfs": "http://www.w3.org/2000/01/rdf-schema#" + }, + "@graph": rows, + })) + + +def main(): + args = docopt.docopt(''' +Usage: + gcalendarwatch [options] + +Options: + -v, --verbose more logging +''') + + verboseLogging(args['--verbose']) + + agendaGraph = PatchableGraph() # next few days + countdownGraph = PatchableGraph() # next n of starred events + conf = json.load(open("gcalendarwatch.conf")) + m = conf['mongo'] + mongoOut = MongoClient(m['host'], m['port'], + tz_aware=True)[m['database']][m['collection']] + sync = SyncToMongo(conf, mongoOut, agendaGraph, countdownGraph) + read = ReadMongoEvents(mongoOut) + + sync.updateGraphs(read.getEvents( + datetime.datetime.now().replace(hour=0, minute=0, second=0), + datetime.datetime.now() + datetime.timedelta(days=60))) + + poller = Poller(sync, conf['minutes_between_polls'] * 60) + + class Application(cyclone.web.Application): + def __init__(self): + handlers = [ + (r"/", Index), + (r'/events', EventsPage), + (r'/pollNow', PollNow), + (r'/graph', CycloneGraphHandler, {'masterGraph': agendaGraph}), + (r'/graph/events', CycloneGraphEventsHandler, {'masterGraph': agendaGraph}), + (r'/countdownGraph', CycloneGraphHandler, {'masterGraph': countdownGraph}), + (r'/countdownGraph/events', CycloneGraphEventsHandler, {'masterGraph': countdownGraph}), + (r'/countdowns.json', Countdowns), + ] + cyclone.web.Application.__init__(self, handlers, + conf=conf, + read=read, + poller=poller, + agendaGraph=agendaGraph, + countdownGraph=countdownGraph, + ) + reactor.listenTCP(conf['serve_port'], Application()) + reactor.run() + +if __name__ == '__main__': + main()