view gcalendarwatch.py @ 11:702f74c3d742

even more optional fields
author drewp@bigasterisk.com
date Tue, 04 Feb 2020 00:19:04 -0800
parents c3f5b0236d0f
children d89a14f4d3ff
line wrap: on
line source

#!/usr/bin/python3

"""
sync google calendar into mongodb, return queries from that as
JSON-LD.

gcalendarwatch.conf looks like this:
{
  "minutes_between_polls" : 60
  "mongo" : {"host" : "h1", "port" : 27017, "database" : "dbname", "collection" : "pim"},
}

This should be updated to use
http://googledevelopers.blogspot.com/2013/07/google-calendar-api-push-notifications.html
and update faster with less polling
"""
import json, datetime, time, traceback, re, docopt

from dateutil.parser import parse
from dateutil.tz import tzlocal
from googleapiclient import discovery
from googleapiclient.http import build_http
from pymongo import MongoClient
from rdflib import Namespace, Literal, Graph, URIRef, RDF
from twisted.internet import reactor
import cyclone.web
import oauth2client, oauth2client.file

from standardservice.logsetup import log, verboseLogging
from patchablegraph import PatchableGraph, CycloneGraphEventsHandler, CycloneGraphHandler

EV = Namespace("http://bigasterisk.com/event#")

"""
example:
{
 'id': 'l640999999999999999999999c',
 'summary': 'sec.......',
 'start': {'timeZone': 'America/Los_Angeles', 'dateTime': '2014-09-25T16:00:00-07:00'},
 'end': {'timeZone': 'America/Los_Angeles', 'dateTime': '2014-09-25T17:00:00-07:00'},
 'endTimeUnspecified': True,
 'created': '2014-09-08T20:39:00.000Z',
 'creator': {'self': True, 'displayName': '...', 'email': '...'},
 'etag': '"2829999999999000"',
 'htmlLink': 'https://www.google.com/calendar/event?eid=bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbEBt',
 'iCalUID': 'l640998888888888888888888888888888com',
 'kind': 'calendar#event',
 'organizer': {'self': True, 'displayName': '...', 'email': '...'},
 'reminders': {'useDefault': True},
 'sequence': 0,
 'status': 'confirmed',
 'updated': '2014-09-17T04:28:56.997Z',
}"""
def recordFromEv(conf, calId, ev):
    def dateOrTime(d):
        if 'date' in d:
            return d['date']
        return d['dateTime']
    rec = {
        'uri': conf['event_uri_ns'] + ev['id'],
        'feed': conf['event_uri_ns'] + 'feed/' + calId,
        'title': ev.get('summary', '?'),
        'start': dateOrTime(ev['start']),
        'end': dateOrTime(ev['end']),
        'endTimeUnspecified': ev.get('endTimeUnspecified', False),
        'htmlLink': ev.get('htmlLink', ''),
        'creatorEmail': ev.get('creator', {}).get('email', ''),
        }

    for field, val in [('start', ev['start']),
                       ('end', ev['end'])]:
        if 'date' in val:
            rec['%sTime' % field] = parse(val['date']).replace(tzinfo=tzlocal())
            rec['%sDate' % field] = val['date']
        else:
            rec['%sTime' % field] = parse(val['dateTime'])
            rec['%sDate' % field] = parse(val['dateTime']).date().isoformat()
    return rec

def asJsonLd(events):
    ret = {'@graph':[]}
    for ev in events:
        ev['startTime'] = ev['startTime'].astimezone(tzlocal()).isoformat()
        ev['endTime'] = ev['endTime'].astimezone(tzlocal()).isoformat()
        ev['@id'] = ev.pop('uri')
        ret['@graph'].append(ev)

    ret['@context'] = {
        "xsd": "http://www.w3.org/2001/XMLSchema#",
        "ev": "http://bigasterisk.com/event#",
        "startTime": {"@id": "ev:startTime", "@type": "xsd:dateTime"},
        "endTime": {"@id": "ev:endTime", "@type": "xsd:dateTime"},
        "startDate": {"@id": "ev:startDate", "@type": "xsd:date"},
        "endDate": {"@id": "ev:endDate", "@type": "xsd:date"},
        "title" : "ev:title",
        "feed": {"@id": "ev:feed", "@type": "@id"},
        "htmlLink": {"@id": "ev:htmlLink", "@type": "@id"},
    }
    return ret

def asGraph(events, extraClasses=[]):
    graph = Graph()
    graph.namespace_manager.bind('ev', EV)
    for ev in events:
        uri = URIRef(ev['uri'])
        add = lambda p, o: graph.add((uri, p, o))
        add(RDF.type, EV['Event'])
        for cls in extraClasses:
            add(RDF.type, cls)
        add(EV['title'], Literal(ev['title']))
        add(EV['start'], Literal(ev['start']))
        add(EV['startDate'], Literal(ev['startDate']))
        add(EV['end'], Literal(ev['end']))
        add(EV['feed'], URIRef(ev['feed']))
        #graph.add((feed, RDFS.label, Literal(ev['feedTitle'])))
        if 'htmlLink' in ev:
            add(EV['htmlLink'], URIRef(ev['htmlLink']))
    return graph



def getCalendarService(client_secrets='client_secret.json',
                       credential_storage='calendar.dat',
                       scope='https://www.googleapis.com/auth/calendar.readonly',
                       name='calendar',
                       version='v3'):
    """
    see
    https://cloud.google.com/docs/authentication/end-user#creating_your_client_credentials
    for getting client_secret.json . Use 'application type' of
    'other'.
    """
    flow = oauth2client.client.flow_from_clientsecrets(client_secrets, scope=scope)

    storage = oauth2client.file.Storage(credential_storage)
    credentials = storage.get()
    if credentials is None or credentials.invalid:
        class Flags:
            logging_level = 'INFO'
            noauth_local_webserver = True
        credentials = oauth2client.tools.run_flow(flow, storage, Flags)
        # (storage now writes back to calendar.dat)
    http = credentials.authorize(http=build_http())

    service = discovery.build(name, version, http=http)
    return service

def getFirstPageOfCalendars(service):
    for row in service.calendarList().list().execute()['items']:
        yield row['id']

def dayRange(days):
    now = datetime.datetime.now(tzlocal())
    start = now - datetime.timedelta(hours=12)
    end = now + datetime.timedelta(days=days)
    return start, end

def limitDays(recs, days):
    start, end = dayRange(days)
    start = start - datetime.timedelta(hours=12)
    # incomplete
    return [r for r in recs if r['startTime'] < end and r['endTime'] > start]

def starred(graph, ev):
    title = graph.value(ev, EV['title'])
    m = re.search(r'(.*)\*\s*$', title)
    if m:
        return m.group(1)
    else:
        return None

def filterStarred(recs, maxCount=15):
    recs = sorted(recs, key=lambda r: r['start'])
    out = []
    for rec in recs:
        if re.search(r'(.*)\*\s*$', rec['title']):
            out.append(rec)
            if len(out) >= maxCount:
                break
    return out

class SyncToMongo(object):
    """reads gcal, writes to mongodb"""
    def __init__(self, conf, collection, agendaGraph, countdownGraph):
        self.conf = conf
        self.service = getCalendarService()
        self.collection = collection
        self.agendaGraph = agendaGraph
        self.countdownGraph = countdownGraph

    def update(self, days=30*6):
        start, end = dayRange(days)
        self.removeEntries(start, end)

        currentRecords = []
        for calId in getFirstPageOfCalendars(self.service):
            print('read %s' % calId)
            events = self.service.events().list(
                calendarId=calId,
                singleEvents=True,
                timeMin=start.isoformat(),
                timeMax=end.isoformat(),
                showDeleted=False,
                maxResults=1000,
            ).execute()

            for ev in events['items']:
                rec = recordFromEv(self.conf, calId, ev)
                self.upsertMongo(rec)
                currentRecords.append(rec)

        self.updateGraphs(currentRecords)

    def removeEntries(self, start, end):
        for doc in list(self.collection.find({"startTime":{"$gte":start, "$lte":end}})):
            self.collection.remove({'_id':doc['_id']})

    def upsertMongo(self, rec):
        if self.collection.find_one({"_id" : rec['uri']}) is not None:
            log.debug("existing record %s", rec['uri'])
            # this is not yet noticing updates
            return []
        else:
            log.debug("new records %s", rec)
            d = rec.copy()
            d['_id'] = d.pop('uri')
            self.collection.insert(d)
            return [rec]

    def updateGraphs(self, currentRecords):
        c = EV['gcalendar']
        currentRecords = list(currentRecords)
        self.agendaGraph.setToGraph(
            [(s,p,o,c) for s,p,o in asGraph(limitDays(currentRecords, days=2))])
        self.countdownGraph.setToGraph(
            [(s,p,o,c) for s,p,o in asGraph(filterStarred(currentRecords, maxCount=15),
                                            extraClasses=[EV['CountdownEvent']])])


class ReadMongoEvents(object):
    """read events from mongodb"""
    def __init__(self, collection):
        self.collection = collection

    def getEvents(self, t1, t2):
        for doc in self.collection.find({"startTime" : {"$gte": t1, "$lt":t2}
                                         }).sort([("startTime",1)]):
            doc['uri'] = doc.pop('_id')
            if 'feedId' in doc:
                doc['feed'] = URIRef('old_event')
            yield doc



class Poller(object):
    def __init__(self, sync, periodSec):
        self.sync = sync
        self.lastUpdateTime = time.time()
        self.everUpdated = False
        self.periodSec = periodSec
        self.scheduled = reactor.callLater(self.periodSec, self._updateLoop)
        self.events = None

    def updateNow(self):
        self.scheduled.cancel()
        self._updateLoop()

    def _updateLoop(self):
        log.info("updating")
        t1 = time.time()
        try:
            self.sync.update()
        except Exception:
            traceback.print_exc()
            log.error("updated failed")
        self.lastUpdateTime = t1
        self.everUpdated = True
        took = time.time() - t1
        self.scheduled = reactor.callLater(max(3, self.periodSec - took),
                                            self._updateLoop)

class PollNow(cyclone.web.RequestHandler):
    def post(self):
        self.settings.poller.updateNow()
        self.set_status(202)

class Index(cyclone.web.RequestHandler):
    def get(self):
        period = self.settings.conf['minutes_between_polls'] * 60
        ago = time.time() - self.settings.poller.lastUpdateTime
        if not self.settings.poller.everUpdated:
            msg = "no completed updates %d sec after startup" % ago
            if ago > period * 1.1:
                raise ValueError(msg)
        else:
            msg = "last update was %d sec ago" % ago
            if ago > period * 1.1:
                raise ValueError(msg)
        self.set_header("content-type", "text/html")
        self.write(open("index.html").read().replace("MSG", msg))


class EventsPage(cyclone.web.RequestHandler):
    def get(self):
        """
        upcoming events as JSON-LD
        """
        arg = self.get_argument
        t1 = parse(arg('t1')) if arg('t1', default=None) else datetime.datetime.now().replace(hour=0, minute=0, second=0)
        t2 = parse(arg('t2')) if arg('t2', default=None) else datetime.datetime.now() + datetime.timedelta(days=int(arg('days')) if arg('days', default=None) else 2)
        if 0:
            self.set_header("content-type", "application/ld+json")
            self.write(asJsonLd(self.settings.read.getEvents(t1, t2)))
        else:
            self.set_header("content-type", "text/n3")
            self.write(asGraph(self.settings.read.getEvents(t1, t2)).serialize(format='n3'))


class Countdowns(cyclone.web.RequestHandler):
    def get(self):
        rows = []
        graph = self.settings.countdownGraph._graph
        for ev in graph.subjects(RDF.type, EV['Event']):
            starLabel = starred(graph, ev)
            if starLabel is not None:
                rows.append({'@type':'countdown', 'time': graph.value(ev, EV['start']), 'label': starLabel})
        self.set_header("content-type", "application/ld+json")
        self.write(json.dumps({
            "@context": {
                "countdown":"http://bigasterisk.com/countdown#CountdownEvent",
                "label": "http://www.w3.org/2000/01/rdf-schema#label",
                "time": {
                    "@id": "http://bigasterisk.com/event#time",
                    "@type": "xsd:dateTime"
                },
                "xsd": "http://www.w3.org/2001/XMLSchema#",
                "rdfs": "http://www.w3.org/2000/01/rdf-schema#"
            },
            "@graph": rows,
            }))


def main():
    args = docopt.docopt('''
Usage:
  gcalendarwatch [options]

Options:
  -v, --verbose  more logging
  --now          don't wait for first update
''')

    verboseLogging(args['--verbose'])

    agendaGraph = PatchableGraph() # next few days
    countdownGraph = PatchableGraph() # next n of starred events
    conf = json.load(open("gcalendarwatch.conf"))
    m = conf['mongo']
    mongoOut = MongoClient(m['host'], m['port'],
                           tz_aware=True)[m['database']][m['collection']]
    sync = SyncToMongo(conf, mongoOut, agendaGraph, countdownGraph)
    read = ReadMongoEvents(mongoOut)

    s, e = dayRange(60)
    sync.updateGraphs(read.getEvents(s, e))

    poller = Poller(sync, conf['minutes_between_polls'] * 60)
    if args['--now']:
        poller.updateNow()

    class Application(cyclone.web.Application):
        def __init__(self):
            handlers = [
                (r"/", Index),
                (r'/events', EventsPage),
                (r'/pollNow', PollNow),
                (r'/graph', CycloneGraphHandler, {'masterGraph': agendaGraph}),
                (r'/graph/events', CycloneGraphEventsHandler, {'masterGraph': agendaGraph}),
                (r'/countdownGraph', CycloneGraphHandler, {'masterGraph': countdownGraph}),
                (r'/countdownGraph/events', CycloneGraphEventsHandler, {'masterGraph': countdownGraph}),
                (r'/countdowns.json', Countdowns),
            ]
            cyclone.web.Application.__init__(self, handlers,
                                             conf=conf,
                                             read=read,
                                             poller=poller,
                                             agendaGraph=agendaGraph,
                                             countdownGraph=countdownGraph,
            )
    reactor.listenTCP(conf['serve_port'], Application())
    reactor.run()

if __name__ == '__main__':
    main()