view gcalendarwatch.py @ 42:7d9609edcf9c

track calendar feed summary/description text and emit them in graphs
author drewp@bigasterisk.com
date Sun, 18 Feb 2024 12:34:53 -0800
parents 119f0cb719eb
children b5d3d9a8c83d
line wrap: on
line source

"""
sync google calendar into mongodb, return queries from that as
JSON-LD.

gcalendarwatch.conf looks like this:
{
  "minutes_between_polls" : 60
  "mongo" : {"host" : "h1", "port" : 27017, "database" : "dbname", "collection" : "pim"},
}

This should be updated to use
http://googledevelopers.blogspot.com/2013/07/google-calendar-api-push-notifications.html
and update faster with less polling
"""
import datetime
import functools
import json
import logging
import re
import time
import traceback
from typing import Any, Dict, Iterable, Optional, cast

import background_loop
import pymongo.collection
from dateutil.tz import tzlocal
from patchablegraph import PatchableGraph
from patchablegraph.handler import GraphEvents, StaticGraph
from pymongo import MongoClient
from rdflib import RDF, Graph, Namespace, URIRef
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import HTMLResponse, JSONResponse, PlainTextResponse, Response
from starlette.routing import Route
from starlette_exporter import PrometheusMiddleware, handle_metrics

from datetimemath import dayRange, parse
from graphconvert import asGraph
from ingest import SyncToMongo
from localtypes import Conf, Record

logging.basicConfig(level=logging.INFO)
log = logging.getLogger()

EV = Namespace("http://bigasterisk.com/event#")
"""
example:
{
 'id': 'l640999999999999999999999c',
 'summary': 'sec.......',
 'start': {'timeZone': 'America/Los_Angeles', 'dateTime': '2014-09-25T16:00:00-07:00'},
 'end': {'timeZone': 'America/Los_Angeles', 'dateTime': '2014-09-25T17:00:00-07:00'},
 'endTimeUnspecified': True,
 'created': '2014-09-08T20:39:00.000Z',
 'creator': {'self': True, 'displayName': '...', 'email': '...'},
 'etag': '"2829999999999000"',
 'htmlLink': 'https://www.google.com/calendar/event?eid=bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbEBt',
 'iCalUID': 'l640998888888888888888888888888888com',
 'kind': 'calendar#event',
 'organizer': {'self': True, 'displayName': '...', 'email': '...'},
 'reminders': {'useDefault': True},
 'sequence': 0,
 'status': 'confirmed',
 'updated': '2014-09-17T04:28:56.997Z',
}"""


def asJsonLd(events):
    ret = {'@graph': []}  # type: Dict[Any, Any]
    for ev in events:
        ev['startTime'] = ev['startTime'].astimezone(tzlocal()).isoformat()
        ev['endTime'] = ev['endTime'].astimezone(tzlocal()).isoformat()
        ev['@id'] = ev.pop('uri')
        ret['@graph'].append(ev)

    ret['@context'] = {
        "xsd": "http://www.w3.org/2001/XMLSchema#",
        "ev": "http://bigasterisk.com/event#",
        "startTime": {
            "@id": "ev:startTime",
            "@type": "xsd:dateTime"
        },
        "endTime": {
            "@id": "ev:endTime",
            "@type": "xsd:dateTime"
        },
        "startDate": {
            "@id": "ev:startDate",
            "@type": "xsd:date"
        },
        "endDate": {
            "@id": "ev:endDate",
            "@type": "xsd:date"
        },
        "title": "ev:title",
        "feed": {
            "@id": "ev:feed",
            "@type": "@id"
        },
        "htmlLink": {
            "@id": "ev:htmlLink",
            "@type": "@id"
        },
    }
    return ret


def starred(graph: Graph, ev: URIRef) -> Optional[str]:
    title = str(graph.value(ev, EV['title']))
    m = re.search(r'(.*)\*\s*$', title)
    if m:
        return m.group(1)
    else:
        return None


class ReadMongoEvents(object):
    """read events from mongodb"""

    def __init__(self, collection: pymongo.collection.Collection):
        self.collection = collection

    def getEvents(self, t1: datetime.datetime, t2: datetime.datetime) -> Iterable[Record]:
        if t1.tzinfo is None or t2.tzinfo is None:
            raise TypeError("tz-naive datetimes")
        for doc in self.collection.find({"startTime": {"$gte": t1, "$lt": t2}}).sort([("startTime", 1)]):
            doc['uri'] = doc.pop('_id')
            if 'feedId' in doc:
                doc['feed'] = URIRef('old_event')
            yield doc


def update(
        # this is incompletely type-checked:
        sync: SyncToMongo,  # curried by main
        first_run: bool,  # passed by background_loop
        cal=None  # sometimes passed by us
) -> int:
    log.info(f"updating {cal or 'all'}")
    try:
        n = sync.update(cal=cal)
    except Exception:
        traceback.print_exc()
        log.error("update failed")
        n = 0
    return n


# who uses this? pimscreen, frontdoor? they should use the GraphEvents version
def EventsPage(conf: Conf, read: ReadMongoEvents, req: Request) -> Response:
    t1, t2 = dayRange(int(req.query_params.get('days', default='2')))
    if req.query_params.get('t1', default=None):
        t1 = parse(req.query_params.get('t1', ''), tzinfo=tzlocal())
    if req.query_params.get('t2', default=None):
        t2 = parse(req.query_params.get('t2', ''), tzinfo=tzlocal())
    log.info(f'get /events local t1={t1} t2={t2}')
    return PlainTextResponse(media_type="text/n3", content=asGraph(conf, cals=[], events=read.getEvents(t1, t2)).serialize(format='n3'))


def Countdowns(countdownGraph, req: Request) -> Response:
    rows = []
    graph = countdownGraph._graph
    for ev in graph.subjects(RDF.type, EV['Event']):
        starLabel = starred(graph, ev)
        if starLabel is not None:
            rows.append({'@type': 'countdown', 'time': graph.value(ev, EV['start']), 'label': starLabel})
    return JSONResponse(media_type="application/ld+json",
                        content={
                            "@context": {
                                "countdown": "http://bigasterisk.com/countdown#CountdownEvent",
                                "label": "http://www.w3.org/2000/01/rdf-schema#label",
                                "time": {
                                    "@id": "http://bigasterisk.com/event#time",
                                    "@type": "xsd:dateTime"
                                },
                                "xsd": "http://www.w3.org/2001/XMLSchema#",
                                "rdfs": "http://www.w3.org/2000/01/rdf-schema#"
                            },
                            "@graph": rows,
                        })


def statusMsg(conf, loop: background_loop.Loop):
    period = conf['minutes_between_polls'] * 60
    ago = time.time() - loop.lastSuccessRun
    if not loop.everSucceeded:
        msg = "no completed updates %d sec after startup" % ago
        if ago > period * 1.1:
            raise ValueError(msg)
    else:
        msg = "last update was %d sec ago" % ago
        if ago > period * 1.1:
            raise ValueError(msg)
    return msg


async def PollNow(loop: background_loop.Loop, req: Request) -> PlainTextResponse:
    body = await req.body()
    cals = json.loads(body).get('cals', None) if body else [None]
    n = 0
    for cal in cals:
        n += await loop.runNow(cal=cal)
    msg = f"found {n} new records"
    log.info(msg)
    return PlainTextResponse(msg)


def main():
    agendaGraph = PatchableGraph()  # next few days
    countdownGraph = PatchableGraph()  # next n of starred events
    conf = cast(Conf, json.load(open("gcalendarwatch.conf")))
    m = conf['mongo']
    mongoOut = MongoClient(m['host'], m['port'], tz_aware=True)[m['database']][m['collection']]
    sync = SyncToMongo(conf, mongoOut, agendaGraph, countdownGraph)
    read = ReadMongoEvents(mongoOut)

    s, e = dayRange(60)
    sync.updateGraphs(read.getEvents(s, e))

    loop = background_loop.loop_forever(functools.partial(update, sync=sync), conf['minutes_between_polls'] * 60)

    def getRoot(request: Request) -> HTMLResponse:
        return HTMLResponse(content=open("index.html").read().replace("MSG", statusMsg(conf, loop)))

    app = Starlette(debug=True,
                    routes=[
                        Route('/', getRoot),
                        Route('/graph/calendar/upcoming', StaticGraph(agendaGraph)),
                        Route('/graph/calendar/upcoming/events', GraphEvents(agendaGraph)),
                        Route('/graph/calendar/countdown', StaticGraph(countdownGraph)),
                        Route('/graph/calendar/countdown/events', GraphEvents(countdownGraph)),
                        Route('/countdowns.json', functools.partial(Countdowns, countdownGraph)),
                        Route('/events', functools.partial(EventsPage, conf, read)),
                        Route('/pollNow', functools.partial(PollNow, loop), methods=['POST'])
                    ])

    app.add_middleware(PrometheusMiddleware, group_paths=True, filter_unhandled_paths=True, app_name='gcalendarwatch')
    app.add_route("/metrics", handle_metrics)
    return app


app = main()