view gcalendarwatch.py @ 21:9eb6b4806272

timmzone fixes, especially on GET /events
author drewp@bigasterisk.com
date Mon, 08 Feb 2021 16:40:47 -0800
parents 8c7af0d1b118
children 8122ff3b0fe5
line wrap: on
line source

"""
sync google calendar into mongodb, return queries from that as
JSON-LD.

gcalendarwatch.conf looks like this:
{
  "minutes_between_polls" : 60
  "mongo" : {"host" : "h1", "port" : 27017, "database" : "dbname", "collection" : "pim"},
}

This should be updated to use
http://googledevelopers.blogspot.com/2013/07/google-calendar-api-push-notifications.html
and update faster with less polling
"""
import datetime
import json
import re
import time
import traceback

import cyclone.web
import pymongo.collection
from dateutil.parser import parse
from dateutil.tz import tzlocal
from patchablegraph import (
    CycloneGraphEventsHandler,
    CycloneGraphHandler,
    PatchableGraph,
)
from calendar_connection import getCalendarService
from typing import Dict, Any
from pymongo import MongoClient
from rdflib import Graph, Literal, Namespace, RDF, URIRef
from standardservice.logsetup import log, verboseLogging
from twisted.internet import reactor

import docopt
from prometheus_client import Summary
from prometheus_client.exposition import generate_latest
from prometheus_client.registry import REGISTRY

UPDATE = Summary('gcalendarwatch_updates', 'update loop calls')

EV = Namespace("http://bigasterisk.com/event#")
"""
example:
{
 'id': 'l640999999999999999999999c',
 'summary': 'sec.......',
 'start': {'timeZone': 'America/Los_Angeles', 'dateTime': '2014-09-25T16:00:00-07:00'},
 'end': {'timeZone': 'America/Los_Angeles', 'dateTime': '2014-09-25T17:00:00-07:00'},
 'endTimeUnspecified': True,
 'created': '2014-09-08T20:39:00.000Z',
 'creator': {'self': True, 'displayName': '...', 'email': '...'},
 'etag': '"2829999999999000"',
 'htmlLink': 'https://www.google.com/calendar/event?eid=bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbEBt',
 'iCalUID': 'l640998888888888888888888888888888com',
 'kind': 'calendar#event',
 'organizer': {'self': True, 'displayName': '...', 'email': '...'},
 'reminders': {'useDefault': True},
 'sequence': 0,
 'status': 'confirmed',
 'updated': '2014-09-17T04:28:56.997Z',
}"""


def feedFromCalId(conf, calId):
    return conf['event_uri_ns'] + 'feed/' + calId


def recordFromEv(conf: Dict, calId: str, ev: Dict):

    def dateOrTime(d):
        if 'date' in d:
            return d['date']
        return d['dateTime']

    rec = {
        'uri': conf['event_uri_ns'] + ev['id'],
        'feed': feedFromCalId(conf, calId),
        'title': ev.get('summary', '?'),
        'start': dateOrTime(ev['start']),
        'end': dateOrTime(ev['end']),
        'endTimeUnspecified': ev.get('endTimeUnspecified', False),
        'htmlLink': ev.get('htmlLink', ''),
        'creatorEmail': ev.get('creator', {}).get('email', ''),
    }

    for field, val in [('start', ev['start']), ('end', ev['end'])]:
        if 'date' in val:
            rec['%sTime' % field] = parse(val['date']).replace(tzinfo=tzlocal())
            rec['%sDate' % field] = val['date']
        else:
            rec['%sTime' % field] = parse(val['dateTime'])
            rec['%sDate' % field] = parse(val['dateTime']).date().isoformat()
    return rec


def asJsonLd(events):
    ret = {'@graph': []}  # type: Dict[Any, Any]
    for ev in events:
        ev['startTime'] = ev['startTime'].astimezone(tzlocal()).isoformat()
        ev['endTime'] = ev['endTime'].astimezone(tzlocal()).isoformat()
        ev['@id'] = ev.pop('uri')
        ret['@graph'].append(ev)

    ret['@context'] = {
        "xsd": "http://www.w3.org/2001/XMLSchema#",
        "ev": "http://bigasterisk.com/event#",
        "startTime": {
            "@id": "ev:startTime",
            "@type": "xsd:dateTime"
        },
        "endTime": {
            "@id": "ev:endTime",
            "@type": "xsd:dateTime"
        },
        "startDate": {
            "@id": "ev:startDate",
            "@type": "xsd:date"
        },
        "endDate": {
            "@id": "ev:endDate",
            "@type": "xsd:date"
        },
        "title": "ev:title",
        "feed": {
            "@id": "ev:feed",
            "@type": "@id"
        },
        "htmlLink": {
            "@id": "ev:htmlLink",
            "@type": "@id"
        },
    }
    return ret


def asGraph(events, extraClasses=[]):
    graph = Graph()
    graph.namespace_manager.bind('ev', EV)
    for ev in events:
        uri = URIRef(ev['uri'])

        def add(p, o):
            return graph.add((uri, p, o))

        add(RDF.type, EV['Event'])
        for cls in extraClasses:
            add(RDF.type, cls)
        add(EV['title'], Literal(ev['title']))
        add(EV['start'], Literal(ev['start']))
        add(EV['startDate'], Literal(ev['startDate']))
        add(EV['end'], Literal(ev['end']))
        add(EV['feed'], URIRef(ev['feed']))
        # graph.add((feed, RDFS.label, Literal(ev['feedTitle'])))
        if 'htmlLink' in ev:
            add(EV['htmlLink'], URIRef(ev['htmlLink']))
    return graph


def getFirstPageOfCalendars(service):
    for row in service.calendarList().list().execute()['items']:
        yield row['id']


def dayRange(days):
    now = datetime.datetime.now(tzlocal())
    start = now - datetime.timedelta(hours=12)
    end = now + datetime.timedelta(days=days)
    return start, end


def limitDays(recs, days):
    start, end = dayRange(days)
    start = start - datetime.timedelta(hours=12)
    # incomplete
    return [r for r in recs if r['startTime'] < end and r['endTime'] > start]


def starred(graph, ev):
    title = graph.value(ev, EV['title'])
    m = re.search(r'(.*)\*\s*$', title)
    if m:
        return m.group(1)
    else:
        return None


def filterStarred(recs, maxCount=15):
    recs = sorted(recs, key=lambda r: r['start'])
    out = []
    for rec in recs:
        if re.search(r'(.*)\*\s*$', rec['title']):
            out.append(rec)
            if len(out) >= maxCount:
                break
    return out


class SyncToMongo(object):
    """reads gcal, writes to mongodb"""
    collection: pymongo.collection.Collection

    def __init__(self, conf, collection: pymongo.collection.Collection, agendaGraph: PatchableGraph, countdownGraph: PatchableGraph):
        self.conf = conf
        self.service = getCalendarService()
        self.collection = collection
        self.agendaGraph = agendaGraph
        self.countdownGraph = countdownGraph

    @UPDATE.time()
    def update(self, days=10, cal=None) -> int:
        start, end = dayRange(days)
        spec = {"startTime": {"$gte": start, "$lte": end}}
        if cal is not None:
            spec['feed'] = feedFromCalId(self.conf, cal)
        idsFormerlyInRange = [doc['_id'] for doc in self.collection.find(spec)]
        n = self.collection.delete_many(spec)
        log.info(f'cleared {n} records before reread')

        totalNew = 0
        currentRecords = []
        for calId in getFirstPageOfCalendars(self.service):
            if cal and calId != cal:
                continue
            print('read %s' % calId)
            events = self.service.events().list(
                calendarId=calId,
                singleEvents=True,
                timeMin=start.isoformat(),
                timeMax=end.isoformat(),
                showDeleted=False,
                maxResults=1000,
            ).execute()

            for ev in events['items']:
                rec = recordFromEv(self.conf, calId, ev)
                self.upsertMongo(rec)
                if rec['uri'] not in idsFormerlyInRange:
                    totalNew += 1
                currentRecords.append(rec)

        self.updateGraphs(currentRecords)
        return totalNew

    def upsertMongo(self, rec):
        if self.collection.find_one({"_id": rec['uri']}) is not None:
            log.debug("existing record %s", rec['uri'])
            # this is not yet noticing updates
            return []
        else:
            log.debug("add record %s", rec)
            d = rec.copy()
            d['_id'] = d.pop('uri')
            self.collection.insert_one(d)
            return [rec]

    def updateGraphs(self, currentRecords):
        c = EV['gcalendar']
        currentRecords = list(currentRecords)
        self.agendaGraph.setToGraph([(s, p, o, c) for s, p, o in asGraph(limitDays(currentRecords, days=2))])
        self.countdownGraph.setToGraph([(s, p, o, c) for s, p, o in asGraph(filterStarred(currentRecords, maxCount=15), extraClasses=[EV['CountdownEvent']])])


class ReadMongoEvents(object):
    """read events from mongodb"""

    def __init__(self, collection):
        self.collection = collection

    def getEvents(self, t1, t2):
        if t1.tzinfo is None or t2.tzinfo is None:
            raise TypeError("tz-naive datetimes")
        for doc in self.collection.find({"startTime": {"$gte": t1, "$lt": t2}}).sort([("startTime", 1)]):
            doc['uri'] = doc.pop('_id')
            if 'feedId' in doc:
                doc['feed'] = URIRef('old_event')
            yield doc


class Poller(object):

    def __init__(self, sync, periodSec):
        self.sync = sync
        self.lastUpdateTime = time.time()
        self.everUpdated = False
        self.periodSec = periodSec
        self.scheduled = reactor.callLater(self.periodSec, self._updateLoop)
        self.events = None

    def updateNow(self, cal=None) -> int:
        self.scheduled.cancel()
        return self._updateLoop(cal)

    def _updateLoop(self, cal=None) -> int:
        log.info(f"updating {cal or 'all'}")
        t1 = time.time()
        try:
            n = self.sync.update(cal=cal)
        except Exception:
            traceback.print_exc()
            log.error("updated failed")
            n = 0
        self.lastUpdateTime = t1
        self.everUpdated = True
        took = time.time() - t1
        self.scheduled = reactor.callLater(max(3, self.periodSec - took), self._updateLoop)
        return n


class PollNow(cyclone.web.RequestHandler):

    def post(self):
        cal = json.loads(self.request.body).get('cal', None) if self.request.body else None
        n = self.settings.poller.updateNow(cal)
        msg = f"found {n} new records"
        log.info(msg)
        self.write(msg.encode('utf8'))


class Index(cyclone.web.RequestHandler):

    def get(self):
        period = self.settings.conf['minutes_between_polls'] * 60
        ago = time.time() - self.settings.poller.lastUpdateTime
        if not self.settings.poller.everUpdated:
            msg = "no completed updates %d sec after startup" % ago
            if ago > period * 1.1:
                raise ValueError(msg)
        else:
            msg = "last update was %d sec ago" % ago
            if ago > period * 1.1:
                raise ValueError(msg)
        self.set_header("content-type", "text/html")
        self.write(open("index.html").read().replace("MSG", msg))


class EventsPage(cyclone.web.RequestHandler):

    def get(self):
        """
        upcoming events as JSON-LD
        """
        arg = self.get_argument
        t1, t2 = dayRange(int(arg('days', default='2')))
        if arg('t1', default=None):
            t1 = parse(arg('t1'), tzinfo=tzlocal())
        if arg('t2', default=None):
            t2 = parse(arg('t2'), tzinfo=tzlocal())
        log.info(f'get /events local t1={t1} t2={t2}')
        if 0:
            self.set_header("content-type", "application/ld+json")
            self.write(asJsonLd(self.settings.read.getEvents(t1, t2)))
        else:
            self.set_header("content-type", "text/n3")
            self.write(asGraph(self.settings.read.getEvents(t1, t2)).serialize(format='n3'))


class Countdowns(cyclone.web.RequestHandler):

    def get(self):
        rows = []
        graph = self.settings.countdownGraph._graph
        for ev in graph.subjects(RDF.type, EV['Event']):
            starLabel = starred(graph, ev)
            if starLabel is not None:
                rows.append({'@type': 'countdown', 'time': graph.value(ev, EV['start']), 'label': starLabel})
        self.set_header("content-type", "application/ld+json")
        self.write(
            json.dumps({
                "@context": {
                    "countdown": "http://bigasterisk.com/countdown#CountdownEvent",
                    "label": "http://www.w3.org/2000/01/rdf-schema#label",
                    "time": {
                        "@id": "http://bigasterisk.com/event#time",
                        "@type": "xsd:dateTime"
                    },
                    "xsd": "http://www.w3.org/2001/XMLSchema#",
                    "rdfs": "http://www.w3.org/2000/01/rdf-schema#"
                },
                "@graph": rows,
            }))


class Metrics(cyclone.web.RequestHandler):

    def get(self):
        self.add_header('content-type', 'text/plain')
        self.write(generate_latest(REGISTRY))


def main():
    args = docopt.docopt('''
Usage:
  gcalendarwatch [options]

Options:
  -v, --verbose  more logging
  --now          don't wait for first update
''')

    verboseLogging(args['--verbose'])

    agendaGraph = PatchableGraph()  # next few days
    countdownGraph = PatchableGraph()  # next n of starred events
    conf = json.load(open("gcalendarwatch.conf"))
    m = conf['mongo']
    mongoOut = MongoClient(m['host'], m['port'], tz_aware=True)[m['database']][m['collection']]
    sync = SyncToMongo(conf, mongoOut, agendaGraph, countdownGraph)
    read = ReadMongoEvents(mongoOut)

    s, e = dayRange(60)
    sync.updateGraphs(read.getEvents(s, e))

    poller = Poller(sync, conf['minutes_between_polls'] * 60)
    if args['--now']:
        poller.updateNow()

    class Application(cyclone.web.Application):

        def __init__(self):
            handlers = [
                (r"/", Index),
                (r'/events', EventsPage),
                (r'/pollNow', PollNow),
                (r'/graph/calendar/upcoming', CycloneGraphHandler, {
                    'masterGraph': agendaGraph
                }),
                (r'/graph/calendar/upcoming/events', CycloneGraphEventsHandler, {
                    'masterGraph': agendaGraph
                }),
                (r'/graph/calendar/countdown', CycloneGraphHandler, {
                    'masterGraph': countdownGraph
                }),
                (r'/graph/calendar/countdown/events', CycloneGraphEventsHandler, {
                    'masterGraph': countdownGraph
                }),
                (r'/countdowns.json', Countdowns),
                (r'/metrics', Metrics),
            ]
            cyclone.web.Application.__init__(
                self,
                handlers,
                conf=conf,
                read=read,
                poller=poller,
                agendaGraph=agendaGraph,
                countdownGraph=countdownGraph,
            )

    reactor.listenTCP(conf['serve_port'], Application())
    reactor.run()


if __name__ == '__main__':
    main()