diff gcalendarwatch @ 0:e40034f22c69

moved from pimscreen, upgrade to py3. Redo google auth. Ignore-this: 43ada92a0639d288ca76e5486f6fa489
author drewp@bigasterisk.com
date Tue, 25 Jun 2019 17:08:27 -0700
parents
children d77ead665ab2
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/gcalendarwatch	Tue Jun 25 17:08:27 2019 -0700
@@ -0,0 +1,392 @@
+#!/usr/bin/python3
+
+"""
+sync google calendar into mongodb, return queries from that as
+JSON-LD.
+
+gcalendarwatch.conf looks like this:
+{
+  "minutes_between_polls" : 60
+  "mongo" : {"host" : "h1", "port" : 27017, "database" : "dbname", "collection" : "pim"},
+}
+
+This should be updated to use
+http://googledevelopers.blogspot.com/2013/07/google-calendar-api-push-notifications.html
+and update faster with less polling
+"""
+import json, datetime, time, traceback, re, docopt
+
+from dateutil.parser import parse
+from dateutil.tz import tzlocal
+from googleapiclient import discovery 
+from googleapiclient.http import build_http 
+from pymongo import MongoClient
+from rdflib import Namespace, Literal, Graph, URIRef, RDF
+from twisted.internet import reactor
+import cyclone.web
+import oauth2client, oauth2client.file
+
+from standardservice.logsetup import log, verboseLogging
+from patchablegraph import PatchableGraph, CycloneGraphEventsHandler, CycloneGraphHandler
+
+EV = Namespace("http://bigasterisk.com/event#")
+
+"""
+example:
+{
+ 'id': 'l640999999999999999999999c',
+ 'summary': 'sec.......',
+ 'start': {'timeZone': 'America/Los_Angeles', 'dateTime': '2014-09-25T16:00:00-07:00'},
+ 'end': {'timeZone': 'America/Los_Angeles', 'dateTime': '2014-09-25T17:00:00-07:00'},
+ 'endTimeUnspecified': True,
+ 'created': '2014-09-08T20:39:00.000Z',
+ 'creator': {'self': True, 'displayName': '...', 'email': '...'},
+ 'etag': '"2829999999999000"',
+ 'htmlLink': 'https://www.google.com/calendar/event?eid=bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbEBt',
+ 'iCalUID': 'l640998888888888888888888888888888com',
+ 'kind': 'calendar#event',
+ 'organizer': {'self': True, 'displayName': '...', 'email': '...'},
+ 'reminders': {'useDefault': True},
+ 'sequence': 0,
+ 'status': 'confirmed',
+ 'updated': '2014-09-17T04:28:56.997Z',
+}"""
+def recordFromEv(conf, calId, ev):
+    def dateOrTime(d):
+        if 'date' in d:
+            return d['date']
+        return d['dateTime']
+    rec = {
+        'uri': conf['event_uri_ns'] + ev['id'],
+        'feed': conf['event_uri_ns'] + 'feed/' + calId,
+        'title': ev['summary'],
+        'start': dateOrTime(ev['start']),
+        'end': dateOrTime(ev['end']),
+        'endTimeUnspecified': ev.get('endTimeUnspecified', False),
+        'htmlLink': ev['htmlLink'],
+        'creatorEmail': ev['creator']['email'],
+        }
+
+    for field, val in [('start', ev['start']),
+                       ('end', ev['end'])]:
+        if 'date' in val:
+            rec['%sTime' % field] = parse(val['date']).replace(tzinfo=tzlocal())
+            rec['%sDate' % field] = val['date']
+        else:
+            rec['%sTime' % field] = parse(val['dateTime'])
+            rec['%sDate' % field] = parse(val['dateTime']).date().isoformat()
+    return rec
+
+def asJsonLd(events):
+    ret = {'@graph':[]}
+    for ev in events:
+        ev['startTime'] = ev['startTime'].astimezone(tzlocal()).isoformat()
+        ev['endTime'] = ev['endTime'].astimezone(tzlocal()).isoformat()
+        ev['@id'] = ev.pop('uri')
+        ret['@graph'].append(ev)
+
+    ret['@context'] = {
+        "xsd": "http://www.w3.org/2001/XMLSchema#",
+        "ev": "http://bigasterisk.com/event#",
+        "startTime": {"@id": "ev:startTime", "@type": "xsd:dateTime"},
+        "endTime": {"@id": "ev:endTime", "@type": "xsd:dateTime"},
+        "startDate": {"@id": "ev:startDate", "@type": "xsd:date"},
+        "endDate": {"@id": "ev:endDate", "@type": "xsd:date"},
+        "title" : "ev:title",
+        "feed": {"@id": "ev:feed", "@type": "@id"},
+        "htmlLink": {"@id": "ev:htmlLink", "@type": "@id"},
+    }
+    return ret
+
+def asN3(events, conf):
+    return asGraph(events, conf).serialize(format='n3')
+    
+def asGraph(events, conf):
+    graph = Graph()
+    graph.namespace_manager.bind('ev', EV)
+    for ev in events:
+        uri = URIRef(ev['uri'])
+        add = lambda p, o: graph.add((uri, p, o))
+        add(RDF.type, EV['Event'])
+        add(EV['title'], Literal(ev['title']))
+        add(EV['start'], Literal(ev['start']))
+        add(EV['startDate'], Literal(ev['startDate']))
+        add(EV['end'], Literal(ev['end']))
+        add(EV['feed'], URIRef(ev['feed']))
+        #graph.add((feed, RDFS.label, Literal(ev['feedTitle'])))
+        if 'htmlLink' in ev:
+            add(EV['htmlLink'], URIRef(ev['htmlLink']))
+    return graph
+
+
+
+def getCalendarService(client_secrets='client_secret.json',
+                       credential_storage='calemndar.dat',
+                       scope='https://www.googleapis.com/auth/calendar.readonly',
+                       name='calendar',
+                       version='v3'):
+    """
+    see
+    https://cloud.google.com/docs/authentication/end-user#creating_your_client_credentials
+    for getting client_secret.json . Use 'application type' of
+    'other'.
+    """
+    flow = oauth2client.client.flow_from_clientsecrets(client_secrets, scope=scope)
+   
+    storage = oauth2client.file.Storage(credential_storage) 
+    credentials = storage.get() 
+    if credentials is None or credentials.invalid:
+        class Flags:
+            logging_level = 'INFO'
+            noauth_local_webserver = True
+        credentials = oauth2client.tools.run_flow(flow, storage, Flags)
+        # (storage now writes back to calendar.dat)
+    http = credentials.authorize(http=build_http()) 
+   
+    service = discovery.build(name, version, http=http) 
+    return service
+
+def getFirstPageOfCalendars(service):
+    for row in service.calendarList().list().execute()['items']:
+        yield row['id']
+
+def dayRange(days):
+    now = datetime.datetime.now(tzlocal())
+    start = now
+    end = now + datetime.timedelta(days=days)
+    return start, end
+        
+def limitDays(recs, days):
+    start, end = dayRange(days)
+    start = start - datetime.timedelta(hours=12)
+    # incomplete
+    return [r for r in recs if r['startTime'] < end and r['endTime'] > start]
+
+def starred(graph, ev):
+    title = graph.value(ev, EV['title'])
+    m = re.search(r'(.*)\*\s*$', title)
+    if m:
+        return m.group(1)
+    else:
+        return None
+
+def filterStarred(recs, maxCount=15):
+    recs = sorted(recs, key=lambda r: r['start'])
+    out = []
+    for rec in recs:
+        if re.search(r'(.*)\*\s*$', rec['title']):
+            out.append(rec)
+            if len(out) >= maxCount:
+                break
+    return out
+        
+class SyncToMongo(object):
+    """reads gcal, writes to mongodb"""
+    def __init__(self, conf, collection, agendaGraph, countdownGraph):
+        self.conf = conf
+        self.service = getCalendarService()
+        self.collection = collection
+        self.agendaGraph = agendaGraph
+        self.countdownGraph = countdownGraph
+
+    def update(self, days=30*6):
+        start, end = dayRange(days)
+        self.removeEntries(start, end)
+
+        currentRecords = []
+        for calId in getFirstPageOfCalendars(self.service):
+            print('read %s' % calId)
+            events = self.service.events().list(
+                calendarId=calId,
+                singleEvents=True,
+                timeMin=start.isoformat(),
+                timeMax=end.isoformat(),
+                showDeleted=False,
+                maxResults=1000,
+            ).execute()
+            
+            for ev in events['items']:
+                rec = recordFromEv(self.conf, calId, ev)
+                self.upsertMongo(rec)
+                currentRecords.append(rec)
+
+        self.updateGraphs(currentRecords)
+        
+    def removeEntries(self, start, end):
+        for doc in list(self.collection.find({"startTime":{"$gte":start, "$lte":end}})):
+            self.collection.remove({'_id':doc['_id']})
+
+    def upsertMongo(self, rec):
+        if self.collection.find_one({"_id" : rec['uri']}) is not None:
+            log.debug("existing record %s", rec['uri'])
+            # this is not yet noticing updates
+            return []
+        else:
+            log.debug("new records %s", rec)
+            d = rec.copy()
+            d['_id'] = d.pop('uri')
+            self.collection.insert(d)
+            return [rec]
+
+    def updateGraphs(self, currentRecords):
+        c = EV['gcalendar']
+        currentRecords = list(currentRecords)
+        self.agendaGraph.setToGraph(
+            [(s,p,o,c) for s,p,o in asGraph(limitDays(currentRecords, days=2), self.conf)])
+        self.countdownGraph.setToGraph(
+            [(s,p,o,c) for s,p,o in asGraph(filterStarred(currentRecords, maxCount=15), self.conf)])
+        
+
+class ReadMongoEvents(object):
+    """read events from mongodb"""
+    def __init__(self, collection):
+        self.collection = collection
+
+    def getEvents(self, t1, t2):
+        for doc in self.collection.find({"startTime" : {"$gte": t1, "$lt":t2}
+                                         }).sort([("startTime",1)]):
+            doc['uri'] = doc.pop('_id')
+            if 'feedId' in doc:
+                doc['feed'] = URIRef('old_event')
+            yield doc
+
+
+
+class Poller(object):
+    def __init__(self, sync, periodSec):
+        self.sync = sync
+        self.lastUpdateTime = time.time()
+        self.everUpdated = False
+        self.periodSec = periodSec
+        self.scheduled = reactor.callLater(self.periodSec, self._updateLoop)
+        self.events = None
+
+    def updateNow(self):
+        self.scheduled.cancel()
+        self._updateLoop()
+
+    def _updateLoop(self):
+        log.info("updating")
+        t1 = time.time()
+        try:
+            self.sync.update()
+        except Exception:
+            traceback.print_exc()
+            log.error("updated failed")
+        self.lastUpdateTime = t1
+        self.everUpdated = True
+        took = time.time() - t1
+        self.scheduled = reactor.callLater(max(3, self.periodSec - took),
+                                            self._updateLoop)
+
+class PollNow(cyclone.web.RequestHandler):
+    def post(self):
+        self.settings.poller.updateNow()
+        self.set_status(202)
+
+class Index(cyclone.web.RequestHandler):
+    def get(self):
+        period = self.settings.conf['minutes_between_polls'] * 60
+        ago = time.time() - self.settings.poller.lastUpdateTime
+        if not self.settings.poller.everUpdated:
+            msg = "no completed updates %d sec after startup" % ago
+            if ago > period * 1.1:
+                raise ValueError(msg)
+        else:
+            msg = "last update was %d sec ago" % ago
+            if ago > period * 1.1:
+                raise ValueError(msg)
+        self.set_header("content-type", "text/html")
+        self.write(open("gcalendarwatch.html").read().replace("MSG", msg))
+
+
+class EventsPage(cyclone.web.RequestHandler):
+    def get(self):
+        """
+        upcoming events as JSON-LD
+        """
+        arg = self.get_argument
+        t1 = parse(arg('t1')) if arg('t1', default=None) else datetime.datetime.now().replace(hour=0, minute=0, second=0)
+        t2 = parse(arg('t2')) if arg('t2', default=None) else datetime.datetime.now() + datetime.timedelta(days=int(arg('days')) if arg('days', default=None) else 2)
+        if 0:
+            self.set_header("content-type", "application/ld+json")
+            self.write(asJsonLd(self.settings.read.getEvents(t1, t2)))
+        else:
+            self.set_header("content-type", "text/n3")
+            self.write(asN3(self.settings.read.getEvents(t1, t2), self.settings.conf))
+    
+            
+class Countdowns(cyclone.web.RequestHandler):
+    def get(self):
+        rows = []
+        graph = self.settings.countdownGraph._graph
+        for ev in graph.subjects(RDF.type, EV['Event']):
+            starLabel = starred(graph, ev)
+            if starLabel is not None:
+                rows.append({'@type':'countdown', 'time': graph.value(ev, EV['start']), 'label': starLabel})
+        self.set_header("content-type", "application/ld+json")
+        self.write(json.dumps({
+            "@context": {
+                "countdown":"http://bigasterisk.com/countdown#CountdownEvent",
+                "label": "http://www.w3.org/2000/01/rdf-schema#label",
+                "time": {
+                    "@id": "http://bigasterisk.com/event#time",
+                    "@type": "xsd:dateTime"
+                },
+                "xsd": "http://www.w3.org/2001/XMLSchema#",
+                "rdfs": "http://www.w3.org/2000/01/rdf-schema#"
+            },
+            "@graph": rows,
+            }))
+
+            
+def main():
+    args = docopt.docopt('''
+Usage:
+  gcalendarwatch [options]
+
+Options:
+  -v, --verbose  more logging
+''')
+
+    verboseLogging(args['--verbose'])
+
+    agendaGraph = PatchableGraph() # next few days
+    countdownGraph = PatchableGraph() # next n of starred events
+    conf = json.load(open("gcalendarwatch.conf"))
+    m = conf['mongo']
+    mongoOut = MongoClient(m['host'], m['port'],
+                           tz_aware=True)[m['database']][m['collection']]
+    sync = SyncToMongo(conf, mongoOut, agendaGraph, countdownGraph)
+    read = ReadMongoEvents(mongoOut)
+
+    sync.updateGraphs(read.getEvents(
+        datetime.datetime.now().replace(hour=0, minute=0, second=0),
+        datetime.datetime.now() + datetime.timedelta(days=60)))
+    
+    poller = Poller(sync, conf['minutes_between_polls'] * 60)
+
+    class Application(cyclone.web.Application):
+        def __init__(self):
+            handlers = [
+                (r"/", Index),
+                (r'/events', EventsPage),
+                (r'/pollNow', PollNow),
+                (r'/graph', CycloneGraphHandler, {'masterGraph': agendaGraph}),
+                (r'/graph/events', CycloneGraphEventsHandler, {'masterGraph': agendaGraph}),
+                (r'/countdownGraph', CycloneGraphHandler, {'masterGraph': countdownGraph}),
+                (r'/countdownGraph/events', CycloneGraphEventsHandler, {'masterGraph': countdownGraph}),
+                (r'/countdowns.json', Countdowns),
+            ]
+            cyclone.web.Application.__init__(self, handlers,
+                                             conf=conf,
+                                             read=read,
+                                             poller=poller,
+                                             agendaGraph=agendaGraph,
+                                             countdownGraph=countdownGraph,
+            )
+    reactor.listenTCP(conf['serve_port'], Application())
+    reactor.run()
+
+if __name__ == '__main__':
+    main()