view gcalendarwatch.py @ 20:8c7af0d1b118

format
author drewp@bigasterisk.com
date Thu, 29 Oct 2020 23:48:35 -0700
parents 0af075b62e4a
children 9eb6b4806272
line wrap: on
line source

"""
sync google calendar into mongodb, return queries from that as
JSON-LD.

gcalendarwatch.conf looks like this:
{
  "minutes_between_polls" : 60
  "mongo" : {"host" : "h1", "port" : 27017, "database" : "dbname", "collection" : "pim"},
}

This should be updated to use
http://googledevelopers.blogspot.com/2013/07/google-calendar-api-push-notifications.html
and update faster with less polling
"""
import datetime
import json
import re
import time
import traceback

import cyclone.web
import pymongo.collection
from dateutil.parser import parse
from dateutil.tz import tzlocal
from patchablegraph import (
    CycloneGraphEventsHandler,
    CycloneGraphHandler,
    PatchableGraph,
)
from calendar_connection import getCalendarService
from typing import Dict, Any
from pymongo import MongoClient
from rdflib import Graph, Literal, Namespace, RDF, URIRef
from standardservice.logsetup import log, verboseLogging
from twisted.internet import reactor

import docopt
from prometheus_client import Summary
from prometheus_client.exposition import generate_latest
from prometheus_client.registry import REGISTRY

UPDATE = Summary('gcalendarwatch_updates', 'update loop calls')

EV = Namespace("http://bigasterisk.com/event#")
"""
example:
{
 'id': 'l640999999999999999999999c',
 'summary': 'sec.......',
 'start': {'timeZone': 'America/Los_Angeles', 'dateTime': '2014-09-25T16:00:00-07:00'},
 'end': {'timeZone': 'America/Los_Angeles', 'dateTime': '2014-09-25T17:00:00-07:00'},
 'endTimeUnspecified': True,
 'created': '2014-09-08T20:39:00.000Z',
 'creator': {'self': True, 'displayName': '...', 'email': '...'},
 'etag': '"2829999999999000"',
 'htmlLink': 'https://www.google.com/calendar/event?eid=bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbEBt',
 'iCalUID': 'l640998888888888888888888888888888com',
 'kind': 'calendar#event',
 'organizer': {'self': True, 'displayName': '...', 'email': '...'},
 'reminders': {'useDefault': True},
 'sequence': 0,
 'status': 'confirmed',
 'updated': '2014-09-17T04:28:56.997Z',
}"""


def feedFromCalId(conf, calId):
    return conf['event_uri_ns'] + 'feed/' + calId


def recordFromEv(conf: Dict, calId: str, ev: Dict):

    def dateOrTime(d):
        if 'date' in d:
            return d['date']
        return d['dateTime']

    rec = {
        'uri': conf['event_uri_ns'] + ev['id'],
        'feed': feedFromCalId(conf, calId),
        'title': ev.get('summary', '?'),
        'start': dateOrTime(ev['start']),
        'end': dateOrTime(ev['end']),
        'endTimeUnspecified': ev.get('endTimeUnspecified', False),
        'htmlLink': ev.get('htmlLink', ''),
        'creatorEmail': ev.get('creator', {}).get('email', ''),
    }

    for field, val in [('start', ev['start']), ('end', ev['end'])]:
        if 'date' in val:
            rec['%sTime' % field] = parse(val['date']).replace(tzinfo=tzlocal())
            rec['%sDate' % field] = val['date']
        else:
            rec['%sTime' % field] = parse(val['dateTime'])
            rec['%sDate' % field] = parse(val['dateTime']).date().isoformat()
    return rec


def asJsonLd(events):
    ret = {'@graph': []}  # type: Dict[Any, Any]
    for ev in events:
        ev['startTime'] = ev['startTime'].astimezone(tzlocal()).isoformat()
        ev['endTime'] = ev['endTime'].astimezone(tzlocal()).isoformat()
        ev['@id'] = ev.pop('uri')
        ret['@graph'].append(ev)

    ret['@context'] = {
        "xsd": "http://www.w3.org/2001/XMLSchema#",
        "ev": "http://bigasterisk.com/event#",
        "startTime": {
            "@id": "ev:startTime",
            "@type": "xsd:dateTime"
        },
        "endTime": {
            "@id": "ev:endTime",
            "@type": "xsd:dateTime"
        },
        "startDate": {
            "@id": "ev:startDate",
            "@type": "xsd:date"
        },
        "endDate": {
            "@id": "ev:endDate",
            "@type": "xsd:date"
        },
        "title": "ev:title",
        "feed": {
            "@id": "ev:feed",
            "@type": "@id"
        },
        "htmlLink": {
            "@id": "ev:htmlLink",
            "@type": "@id"
        },
    }
    return ret


def asGraph(events, extraClasses=[]):
    graph = Graph()
    graph.namespace_manager.bind('ev', EV)
    for ev in events:
        uri = URIRef(ev['uri'])

        def add(p, o):
            return graph.add((uri, p, o))

        add(RDF.type, EV['Event'])
        for cls in extraClasses:
            add(RDF.type, cls)
        add(EV['title'], Literal(ev['title']))
        add(EV['start'], Literal(ev['start']))
        add(EV['startDate'], Literal(ev['startDate']))
        add(EV['end'], Literal(ev['end']))
        add(EV['feed'], URIRef(ev['feed']))
        # graph.add((feed, RDFS.label, Literal(ev['feedTitle'])))
        if 'htmlLink' in ev:
            add(EV['htmlLink'], URIRef(ev['htmlLink']))
    return graph


def getFirstPageOfCalendars(service):
    for row in service.calendarList().list().execute()['items']:
        yield row['id']


def dayRange(days):
    now = datetime.datetime.now(tzlocal())
    start = now - datetime.timedelta(hours=12)
    end = now + datetime.timedelta(days=days)
    return start, end


def limitDays(recs, days):
    start, end = dayRange(days)
    start = start - datetime.timedelta(hours=12)
    # incomplete
    return [r for r in recs if r['startTime'] < end and r['endTime'] > start]


def starred(graph, ev):
    title = graph.value(ev, EV['title'])
    m = re.search(r'(.*)\*\s*$', title)
    if m:
        return m.group(1)
    else:
        return None


def filterStarred(recs, maxCount=15):
    recs = sorted(recs, key=lambda r: r['start'])
    out = []
    for rec in recs:
        if re.search(r'(.*)\*\s*$', rec['title']):
            out.append(rec)
            if len(out) >= maxCount:
                break
    return out


class SyncToMongo(object):
    """reads gcal, writes to mongodb"""
    collection: pymongo.collection.Collection

    def __init__(self, conf, collection: pymongo.collection.Collection, agendaGraph: PatchableGraph, countdownGraph: PatchableGraph):
        self.conf = conf
        self.service = getCalendarService()
        self.collection = collection
        self.agendaGraph = agendaGraph
        self.countdownGraph = countdownGraph

    @UPDATE.time()
    def update(self, days=10, cal=None) -> int:
        start, end = dayRange(days)
        spec = {"startTime": {"$gte": start, "$lte": end}}
        if cal is not None:
            spec['feed'] = feedFromCalId(self.conf, cal)
        idsFormerlyInRange = [doc['_id'] for doc in self.collection.find(spec)]
        n = self.collection.delete_many(spec)
        log.info(f'cleared {n} records before reread')

        totalNew = 0
        currentRecords = []
        for calId in getFirstPageOfCalendars(self.service):
            if cal and calId != cal:
                continue
            print('read %s' % calId)
            events = self.service.events().list(
                calendarId=calId,
                singleEvents=True,
                timeMin=start.isoformat(),
                timeMax=end.isoformat(),
                showDeleted=False,
                maxResults=1000,
            ).execute()

            for ev in events['items']:
                rec = recordFromEv(self.conf, calId, ev)
                self.upsertMongo(rec)
                if rec['uri'] not in idsFormerlyInRange:
                    totalNew += 1
                currentRecords.append(rec)

        self.updateGraphs(currentRecords)
        return totalNew

    def upsertMongo(self, rec):
        if self.collection.find_one({"_id": rec['uri']}) is not None:
            log.debug("existing record %s", rec['uri'])
            # this is not yet noticing updates
            return []
        else:
            log.debug("add record %s", rec)
            d = rec.copy()
            d['_id'] = d.pop('uri')
            self.collection.insert_one(d)
            return [rec]

    def updateGraphs(self, currentRecords):
        c = EV['gcalendar']
        currentRecords = list(currentRecords)
        self.agendaGraph.setToGraph([(s, p, o, c) for s, p, o in asGraph(limitDays(currentRecords, days=2))])
        self.countdownGraph.setToGraph([(s, p, o, c) for s, p, o in asGraph(filterStarred(currentRecords, maxCount=15), extraClasses=[EV['CountdownEvent']])])


class ReadMongoEvents(object):
    """read events from mongodb"""

    def __init__(self, collection):
        self.collection = collection

    def getEvents(self, t1, t2):
        for doc in self.collection.find({"startTime": {"$gte": t1, "$lt": t2}}).sort([("startTime", 1)]):
            doc['uri'] = doc.pop('_id')
            if 'feedId' in doc:
                doc['feed'] = URIRef('old_event')
            yield doc


class Poller(object):

    def __init__(self, sync, periodSec):
        self.sync = sync
        self.lastUpdateTime = time.time()
        self.everUpdated = False
        self.periodSec = periodSec
        self.scheduled = reactor.callLater(self.periodSec, self._updateLoop)
        self.events = None

    def updateNow(self, cal=None) -> int:
        self.scheduled.cancel()
        return self._updateLoop(cal)

    def _updateLoop(self, cal=None) -> int:
        log.info(f"updating {cal or 'all'}")
        t1 = time.time()
        try:
            n = self.sync.update(cal=cal)
        except Exception:
            traceback.print_exc()
            log.error("updated failed")
            n = 0
        self.lastUpdateTime = t1
        self.everUpdated = True
        took = time.time() - t1
        self.scheduled = reactor.callLater(max(3, self.periodSec - took), self._updateLoop)
        return n


class PollNow(cyclone.web.RequestHandler):

    def post(self):
        cal = json.loads(self.request.body).get('cal', None) if self.request.body else None
        n = self.settings.poller.updateNow(cal)
        msg = f"found {n} new records"
        log.info(msg)
        self.write(msg.encode('utf8'))


class Index(cyclone.web.RequestHandler):

    def get(self):
        period = self.settings.conf['minutes_between_polls'] * 60
        ago = time.time() - self.settings.poller.lastUpdateTime
        if not self.settings.poller.everUpdated:
            msg = "no completed updates %d sec after startup" % ago
            if ago > period * 1.1:
                raise ValueError(msg)
        else:
            msg = "last update was %d sec ago" % ago
            if ago > period * 1.1:
                raise ValueError(msg)
        self.set_header("content-type", "text/html")
        self.write(open("index.html").read().replace("MSG", msg))


class EventsPage(cyclone.web.RequestHandler):

    def get(self):
        """
        upcoming events as JSON-LD
        """
        arg = self.get_argument
        t1 = parse(arg('t1')) if arg('t1', default=None) else datetime.datetime.now().replace(hour=0, minute=0, second=0)
        t2 = parse(arg('t2')) if arg('t2',
                                     default=None) else datetime.datetime.now() + datetime.timedelta(days=int(arg('days')) if arg('days', default=None) else 2)
        if 0:
            self.set_header("content-type", "application/ld+json")
            self.write(asJsonLd(self.settings.read.getEvents(t1, t2)))
        else:
            self.set_header("content-type", "text/n3")
            self.write(asGraph(self.settings.read.getEvents(t1, t2)).serialize(format='n3'))


class Countdowns(cyclone.web.RequestHandler):

    def get(self):
        rows = []
        graph = self.settings.countdownGraph._graph
        for ev in graph.subjects(RDF.type, EV['Event']):
            starLabel = starred(graph, ev)
            if starLabel is not None:
                rows.append({'@type': 'countdown', 'time': graph.value(ev, EV['start']), 'label': starLabel})
        self.set_header("content-type", "application/ld+json")
        self.write(
            json.dumps({
                "@context": {
                    "countdown": "http://bigasterisk.com/countdown#CountdownEvent",
                    "label": "http://www.w3.org/2000/01/rdf-schema#label",
                    "time": {
                        "@id": "http://bigasterisk.com/event#time",
                        "@type": "xsd:dateTime"
                    },
                    "xsd": "http://www.w3.org/2001/XMLSchema#",
                    "rdfs": "http://www.w3.org/2000/01/rdf-schema#"
                },
                "@graph": rows,
            }))


class Metrics(cyclone.web.RequestHandler):

    def get(self):
        self.add_header('content-type', 'text/plain')
        self.write(generate_latest(REGISTRY))


def main():
    args = docopt.docopt('''
Usage:
  gcalendarwatch [options]

Options:
  -v, --verbose  more logging
  --now          don't wait for first update
''')

    verboseLogging(args['--verbose'])

    agendaGraph = PatchableGraph()  # next few days
    countdownGraph = PatchableGraph()  # next n of starred events
    conf = json.load(open("gcalendarwatch.conf"))
    m = conf['mongo']
    mongoOut = MongoClient(m['host'], m['port'], tz_aware=True)[m['database']][m['collection']]
    sync = SyncToMongo(conf, mongoOut, agendaGraph, countdownGraph)
    read = ReadMongoEvents(mongoOut)

    s, e = dayRange(60)
    sync.updateGraphs(read.getEvents(s, e))

    poller = Poller(sync, conf['minutes_between_polls'] * 60)
    if args['--now']:
        poller.updateNow()

    class Application(cyclone.web.Application):

        def __init__(self):
            handlers = [
                (r"/", Index),
                (r'/events', EventsPage),
                (r'/pollNow', PollNow),
                (r'/graph/calendar/upcoming', CycloneGraphHandler, {
                    'masterGraph': agendaGraph
                }),
                (r'/graph/calendar/upcoming/events', CycloneGraphEventsHandler, {
                    'masterGraph': agendaGraph
                }),
                (r'/graph/calendar/countdown', CycloneGraphHandler, {
                    'masterGraph': countdownGraph
                }),
                (r'/graph/calendar/countdown/events', CycloneGraphEventsHandler, {
                    'masterGraph': countdownGraph
                }),
                (r'/countdowns.json', Countdowns),
                (r'/metrics', Metrics),
            ]
            cyclone.web.Application.__init__(
                self,
                handlers,
                conf=conf,
                read=read,
                poller=poller,
                agendaGraph=agendaGraph,
                countdownGraph=countdownGraph,
            )

    reactor.listenTCP(conf['serve_port'], Application())
    reactor.run()


if __name__ == '__main__':
    main()