view gcalendarwatch.py @ 83:7f50e5bb30f5

ide
author drewp@bigasterisk.com
date Sat, 07 Sep 2024 16:12:15 -0700
parents e53a1bc87f99
children 5f7ae444ecae
line wrap: on
line source

"""
sync google calendar into mongodb, return a few preset queries as RDF graphs

gcalendarwatch.conf looks like this:
{
  "minutes_between_polls" : 60
  "mongo" : {"host" : "h1", "port" : 27017, "database" : "dbname", "collection" : "pim"},
}

This should be updated to use
http://googledevelopers.blogspot.com/2013/07/google-calendar-api-push-notifications.html
and update faster with less polling
"""
import datetime
import functools
import json
import logging
import time
import traceback
from typing import Iterable, cast

import background_loop
import pymongo.collection
from dateutil.tz import tzlocal
from patchablegraph import PatchableGraph
from patchablegraph.handler import GraphEvents, StaticGraph
from pymongo import MongoClient
from rdflib import Namespace, URIRef
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import HTMLResponse, PlainTextResponse
from starlette.routing import Route
from starlette_exporter import PrometheusMiddleware, handle_metrics

from datetimemath import dayRange
from graphconvert import asGraph
from ingest import SyncToMongo
from localtypes import Conf, Record

logging.basicConfig(level=logging.INFO)
log = logging.getLogger()

EV = Namespace("http://bigasterisk.com/event#")
"""
example:
{
 'id': 'l640999999999999999999999c',
 'summary': 'sec.......',
 'start': {'timeZone': 'America/Los_Angeles', 'dateTime': '2014-09-25T16:00:00-07:00'},
 'end': {'timeZone': 'America/Los_Angeles', 'dateTime': '2014-09-25T17:00:00-07:00'},
 'endTimeUnspecified': True,
 'created': '2014-09-08T20:39:00.000Z',
 'creator': {'self': True, 'displayName': '...', 'email': '...'},
 'etag': '"2829999999999000"',
 'htmlLink': 'https://www.google.com/calendar/event?eid=bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbEBt',
 'iCalUID': 'l640998888888888888888888888888888com',
 'kind': 'calendar#event',
 'organizer': {'self': True, 'displayName': '...', 'email': '...'},
 'reminders': {'useDefault': True},
 'sequence': 0,
 'status': 'confirmed',
 'updated': '2014-09-17T04:28:56.997Z',
}"""


class ReadMongoEvents(object):
    """read events from mongodb"""

    def __init__(self, collection: pymongo.collection.Collection):
        self.collection = collection

    def getEvents(self, t1: datetime.datetime, t2: datetime.datetime) -> Iterable[Record]:
        if t1.tzinfo is None or t2.tzinfo is None:
            raise TypeError("tz-naive datetimes")
        for doc in self.collection.find({"startTime": {"$gte": t1, "$lt": t2}}).sort([("startTime", 1)]):
            doc['uri'] = doc.pop('_id')
            if 'feedId' in doc:
                doc['feed'] = URIRef('old_event')
            yield doc


def update(
        # this is incompletely type-checked:
        sync: SyncToMongo,  # curried by main
        first_run: bool,  # passed by background_loop
        cal=None  # sometimes passed by us
) -> int:
    log.info(f"updating {cal or 'all'}")
    try:
        n = sync.update(cal=cal, days=120)
    except Exception:
        traceback.print_exc()
        log.error("update failed")
        n = 0
    return n


def statusMsg(conf, loop: background_loop.Loop):
    period = conf['minutes_between_polls'] * 60
    ago = time.time() - loop.lastSuccessRun
    if not loop.everSucceeded:
        msg = "no completed updates %d sec after startup" % ago
        if ago > period * 1.1:
            raise ValueError(msg)
    else:
        msg = "last update was %d sec ago" % ago
        if ago > period * 1.1:
            raise ValueError(msg)
    return msg


async def PollNow(loop: background_loop.Loop, req: Request) -> PlainTextResponse:
    body = await req.body()
    cals = json.loads(body).get('cals', None) if body else [None]
    n = 0
    for cal in cals:
        n += await loop.runNow(cal=cal)
    msg = f"found {n} new records"
    log.info(msg)
    return PlainTextResponse(msg)


def updateCurrentEvents(conf: Conf, currentEventsGraph: PatchableGraph, collection: pymongo.collection.Collection, first_run: bool):
    now = datetime.datetime.now(tzlocal())
    events = list(collection.find({"startTime": {"$lte": now}, "endTime": {"$gte": now}}))
    currentEventsGraph.setToGraph(asGraph(conf, cals=[], events=events, extraClasses=[EV['CurrentEvent']]))


def main():
    agendaGraph = PatchableGraph()  # next few days
    countdownGraph = PatchableGraph()  # next n of starred events
    currentEventsGraph = PatchableGraph()  # events happening now
    conf = cast(Conf, json.load(open("gcalendarwatch.conf")))
    m = conf['mongo']
    mongoOut = MongoClient(m['host'], m['port'], tz_aware=True)[m['database']][m['collection']]
    sync = SyncToMongo(conf, mongoOut, agendaGraph, countdownGraph)
    read = ReadMongoEvents(mongoOut)

    s, e = dayRange(120)
    sync.updateGraphs(read.getEvents(s, e))

    loop = background_loop.loop_forever(functools.partial(update, sync=sync), conf['minutes_between_polls'] * 60)
    background_loop.loop_forever(functools.partial(updateCurrentEvents, conf, currentEventsGraph, mongoOut), 5, metric_prefix="current_events")

    def getRoot(request: Request) -> HTMLResponse:
        return HTMLResponse(content=open("index.html").read().replace("MSG", statusMsg(conf, loop)))

    app = Starlette(debug=True,
                    routes=[
                        Route('/', getRoot),
                        Route('/graph/calendar/upcoming', StaticGraph(agendaGraph)),
                        Route('/graph/calendar/upcoming/events', GraphEvents(agendaGraph)),
                        Route('/graph/calendar/countdown', StaticGraph(countdownGraph)),
                        Route('/graph/calendar/countdown/events', GraphEvents(countdownGraph)),
                        Route('/graph/currentEvents', StaticGraph(currentEventsGraph)),
                        Route('/graph/currentEvents/events', GraphEvents(currentEventsGraph)),
                        Route('/pollNow', functools.partial(PollNow, loop), methods=['POST'])
                    ])

    app.add_middleware(PrometheusMiddleware, group_paths=True, filter_unhandled_paths=True, app_name='gcalendarwatch')
    app.add_route("/metrics", handle_metrics)
    return app


app = main()