Mercurial > code > home > repos > gcalendarwatch
changeset 85:f75b3a109b66
rewrite gcalendarwatch to read from calsync's mongo data only, not google services
author | drewp@bigasterisk.com |
---|---|
date | Sat, 07 Sep 2024 16:14:45 -0700 |
parents | 5f7ae444ecae |
children | c2578b4a8d9d |
files | Dockerfile gcalendarwatch.py graphconvert.py index.html ingest.py localtypes.py login.py |
diffstat | 7 files changed, 71 insertions(+), 272 deletions(-) [+] |
line wrap: on
line diff
--- a/Dockerfile Sat Sep 07 16:13:55 2024 -0700 +++ b/Dockerfile Sat Sep 07 16:14:45 2024 -0700 @@ -5,5 +5,4 @@ COPY pyproject.toml pdm.lock ./ RUN pdm install -COPY *.py *.conf *.html *.json ./ -COPY credentials credentials +COPY *.py *.conf *.html ./
--- a/gcalendarwatch.py Sat Sep 07 16:13:55 2024 -0700 +++ b/gcalendarwatch.py Sat Sep 07 16:14:45 2024 -0700 @@ -1,40 +1,33 @@ """ -sync google calendar into mongodb, return a few preset queries as RDF graphs +read calendar from mongodb, return a few preset queries as RDF graphs gcalendarwatch.conf looks like this: { - "minutes_between_polls" : 60 - "mongo" : {"host" : "h1", "port" : 27017, "database" : "dbname", "collection" : "pim"}, + "mongo" : {"host" : "h1", "port" : 27017, "database" : "dbname"}, } - -This should be updated to use -http://googledevelopers.blogspot.com/2013/07/google-calendar-api-push-notifications.html -and update faster with less polling """ import datetime -import functools import json import logging -import time -import traceback -from typing import Iterable, cast +import re +from typing import Sequence, cast import background_loop import pymongo.collection +import pymongo.database from dateutil.tz import tzlocal from patchablegraph import PatchableGraph from patchablegraph.handler import GraphEvents, StaticGraph from pymongo import MongoClient -from rdflib import Namespace, URIRef +from rdflib import Namespace from starlette.applications import Starlette from starlette.requests import Request -from starlette.responses import HTMLResponse, PlainTextResponse +from starlette.responses import HTMLResponse from starlette.routing import Route from starlette_exporter import PrometheusMiddleware, handle_metrics -from datetimemath import dayRange +from datetimemath import dayRange, limitDays from graphconvert import asGraph -from ingest import SyncToMongo from localtypes import Conf, Record logging.basicConfig(level=logging.INFO) @@ -63,87 +56,67 @@ }""" -class ReadMongoEvents(object): - """read events from mongodb""" - - def __init__(self, collection: pymongo.collection.Collection): - self.collection = collection - - def getEvents(self, t1: datetime.datetime, t2: datetime.datetime) -> Iterable[Record]: - if t1.tzinfo is None or t2.tzinfo is None: - raise TypeError("tz-naive datetimes") - for doc in self.collection.find({"startTime": {"$gte": t1, "$lt": t2}}).sort([("startTime", 1)]): - doc['uri'] = doc.pop('_id') - if 'feedId' in doc: - doc['feed'] = URIRef('old_event') - yield doc +def filterStarred(recs: Sequence[Record], maxCount=15) -> list[Record]: + recs = sorted(recs, key=lambda r: r['start']) + out = [] + for rec in recs: + if m := re.search(r'(.*)\*\s*$', rec['title']): + rec = rec.copy() + rec['title'] = m.group(1) + out.append(rec) + if len(out) >= maxCount: + break + return out -def update( - # this is incompletely type-checked: - sync: SyncToMongo, # curried by main - first_run: bool, # passed by background_loop - cal=None # sometimes passed by us -) -> int: - log.info(f"updating {cal or 'all'}") - try: - n = sync.update(cal=cal, days=120) - except Exception: - traceback.print_exc() - log.error("update failed") - n = 0 - return n +class SyncGraphsToMongo(object): + """reads mongodb (that calsync wrote); edits graphs""" + calendarsCollection: pymongo.collection.Collection + eventsCollection: pymongo.collection.Collection + def __init__(self, conf: Conf, db: pymongo.database.Database, agendaGraph: PatchableGraph, countdownGraph: PatchableGraph, + currentEventsGraph: PatchableGraph): + self.conf = conf + self.eventsCollection = db.get_collection('test_gcalendar') + self.calendarsCollection = db.get_collection('test_gcalendar_cals') + + self.agendaGraph = agendaGraph + self.countdownGraph = countdownGraph + self.currentEventsGraph = currentEventsGraph -def statusMsg(conf, loop: background_loop.Loop): - period = conf['minutes_between_polls'] * 60 - ago = time.time() - loop.lastSuccessRun - if not loop.everSucceeded: - msg = "no completed updates %d sec after startup" % ago - if ago > period * 1.1: - raise ValueError(msg) - else: - msg = "last update was %d sec ago" % ago - if ago > period * 1.1: - raise ValueError(msg) - return msg - + def _getEvents(self, t1: datetime.datetime, t2: datetime.datetime) -> list[Record]: + if t1.tzinfo is None or t2.tzinfo is None: + raise TypeError("tz-naive datetimes") + return list(self.eventsCollection.find({"startTime": {"$gte": t1, "$lt": t2}}).sort([("startTime", 1)])) -async def PollNow(loop: background_loop.Loop, req: Request) -> PlainTextResponse: - body = await req.body() - cals = json.loads(body).get('cals', None) if body else [None] - n = 0 - for cal in cals: - n += await loop.runNow(cal=cal) - msg = f"found {n} new records" - log.info(msg) - return PlainTextResponse(msg) + def updateGraphs(self, first_run): + s, e = dayRange(120) + currentRecords = self._getEvents(s, e) + cals = list(self.calendarsCollection.find()) + self.agendaGraph.setToGraph(asGraph(self.conf, cals, limitDays(currentRecords, days=2))) + self.countdownGraph.setToGraph(asGraph(self.conf, cals, filterStarred(currentRecords, maxCount=15), extraClasses=[EV['CountdownEvent']])) - -def updateCurrentEvents(conf: Conf, currentEventsGraph: PatchableGraph, collection: pymongo.collection.Collection, first_run: bool): - now = datetime.datetime.now(tzlocal()) - events = list(collection.find({"startTime": {"$lte": now}, "endTime": {"$gte": now}})) - currentEventsGraph.setToGraph(asGraph(conf, cals=[], events=events, extraClasses=[EV['CurrentEvent']])) + now = datetime.datetime.now(tzlocal()) + events = list(self.eventsCollection.find({"startTime": {"$lte": now}, "endTime": {"$gte": now}})) + self.currentEventsGraph.setToGraph(asGraph(self.conf, cals=[], events=events, extraClasses=[EV['CurrentEvent']])) def main(): - agendaGraph = PatchableGraph() # next few days - countdownGraph = PatchableGraph() # next n of starred events - currentEventsGraph = PatchableGraph() # events happening now + agendaGraph = PatchableGraph(label='agenda') # next few days + countdownGraph = PatchableGraph(label='countdown') # next n of starred events + currentEventsGraph = PatchableGraph(label='currentEvents') # events happening now + conf = cast(Conf, json.load(open("gcalendarwatch.conf"))) m = conf['mongo'] - mongoOut = MongoClient(m['host'], m['port'], tz_aware=True)[m['database']][m['collection']] - sync = SyncToMongo(conf, mongoOut, agendaGraph, countdownGraph) - read = ReadMongoEvents(mongoOut) + db = MongoClient(m['host'], m['port'], tz_aware=True)[m['database']] + sync = SyncGraphsToMongo(conf, db, agendaGraph, countdownGraph, currentEventsGraph) - s, e = dayRange(120) - sync.updateGraphs(read.getEvents(s, e)) - - loop = background_loop.loop_forever(functools.partial(update, sync=sync), conf['minutes_between_polls'] * 60) - background_loop.loop_forever(functools.partial(updateCurrentEvents, conf, currentEventsGraph, mongoOut), 5, metric_prefix="current_events") + # todo: this should watch for mongodb edits, or get a signal from calsync + background_loop.loop_forever(sync.updateGraphs, 5, metric_prefix="update_graphs") def getRoot(request: Request) -> HTMLResponse: - return HTMLResponse(content=open("index.html").read().replace("MSG", statusMsg(conf, loop))) + return HTMLResponse(content=open("index.html").read()) + moreNs = { "": "http://bigasterisk.com/event#", "cal": "http://bigasterisk.com/calendar/", @@ -159,7 +132,6 @@ Route('/graph/calendar/countdown/events', GraphEvents(countdownGraph)), Route('/graph/currentEvents', StaticGraph(currentEventsGraph, moreNs)), Route('/graph/currentEvents/events', GraphEvents(currentEventsGraph)), - Route('/pollNow', functools.partial(PollNow, loop), methods=['POST']) ]) app.add_middleware(PrometheusMiddleware, group_paths=True, filter_unhandled_paths=True, app_name='gcalendarwatch')
--- a/graphconvert.py Sat Sep 07 16:13:55 2024 -0700 +++ b/graphconvert.py Sat Sep 07 16:14:45 2024 -0700 @@ -1,28 +1,30 @@ +from pprint import pformat, pprint from dateutil.tz import tzlocal from rdflib import RDF, RDFS, ConjunctiveGraph, Literal, Namespace, URIRef +import logging +from localtypes import Conf, Record -from localtypes import Conf, feedFromCalId - +log = logging.getLogger('conv') EV = Namespace("http://bigasterisk.com/event#") -def asGraph(conf: Conf, cals, events, extraClasses=[], ctxUri: URIRef = EV['gcalendar']): +def asGraph(conf: Conf, cals: list, events: list[Record], extraClasses=[], ctxUri: URIRef = EV['gcalendar']): ctx = ConjunctiveGraph(identifier=ctxUri) graph = ConjunctiveGraph() graph.namespace_manager.bind('ev', EV) for doc in cals: - feed = URIRef(feedFromCalId(conf, doc['_id'])) - graph.add((feed, RDF.type, EV['CalendarFeed'], ctx)) + cal = URIRef(doc['_id']) + graph.add((cal, RDF.type, EV['CalendarFeed'], ctx)) if doc['summary']: - graph.add((feed, RDFS.label, Literal(doc['summary'].strip()), ctx)) + graph.add((cal, RDFS.label, Literal(doc['summary'].strip()), ctx)) if doc['description']: - graph.add((feed, EV['description'], Literal(doc['description'].strip()), ctx)) + graph.add((cal, EV['description'], Literal(doc['description'].strip()), ctx)) for ev in events: - uri = URIRef(ev.get('uri', ev.get('_id'))) - if uri is None: - raise ValueError(f"{ev=} had no event uri") + uri = URIRef(ev['_id']) + + graph.add((URIRef(ev['calendarUrl']), EV['event'], uri, ctx)) def add(p: URIRef, o: URIRef | Literal): return graph.add((uri, p, o, ctx)) @@ -34,9 +36,7 @@ add(EV['start'], Literal(ev['start'])) add(EV['startDate'], Literal(ev['startDate'])) add(EV['end'], Literal(ev['end'])) - add(EV['feed'], URIRef(ev['feed'])) - # graph.add((feed, RDFS.label, Literal(ev['feedTitle']))) + add(EV['feed'], URIRef(ev['calendarUrl'])) if 'htmlLink' in ev: add(EV['htmlLink'], URIRef(ev['htmlLink'])) return graph -
--- a/index.html Sat Sep 07 16:13:55 2024 -0700 +++ b/index.html Sat Sep 07 16:14:45 2024 -0700 @@ -22,8 +22,6 @@ <div><a href="graph/calendar/countdown/events">graph/calendar/countdown/events</a></div> <div><a href="graph/currentEvents">graph/currentEvents</a></div> <div><a href="graph/currentEvents/events">graph/currentEvents/events</a></div> -<div><a href="countdowns.json">countdowns.json</a></div> -<div><a href="events">events</a></div> <div><a href="metrics">metrics</a></div> <script type="text/javascript" src="https://bigasterisk.com/lib/jquery-2.0.3.min.js"></script>
--- a/ingest.py Sat Sep 07 16:13:55 2024 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,153 +0,0 @@ -import logging -import os -import re -from typing import Any, Dict, Iterable, List, Sequence, cast - -import pymongo.collection -from dateutil.tz import tzlocal -from googleapiclient.discovery import Resource -from googleapiclient.errors import HttpError -from patchablegraph import PatchableGraph -from rdflib import Namespace - -from calendar_connection import getCalendarService -from datetimemath import dayRange, limitDays, parse -from graphconvert import asGraph -from localtypes import Conf, Record, feedFromCalId - -log = logging.getLogger() -EV = Namespace("http://bigasterisk.com/event#") - - -def _getFirstPageOfCalendars(service: Resource) -> Iterable[tuple[str, str | None, str | None]]: - for row in service.calendarList().list().execute()['items']: - yield row['id'], row.get('summary'), row.get('description') - - -def _recordFromEv(conf: Conf, calId: str, ev: Dict) -> Record: - - def dateOrTime(d): - if 'date' in d: - return d['date'] - return d['dateTime'] - - rec = { - 'uri': conf['event_uri_ns'] + ev['id'], - 'feed': feedFromCalId(conf, calId), - 'title': ev.get('summary', '?'), - 'start': dateOrTime(ev['start']), - 'end': dateOrTime(ev['end']), - 'endTimeUnspecified': ev.get('endTimeUnspecified', False), - 'htmlLink': ev.get('htmlLink', ''), - 'creatorEmail': ev.get('creator', {}).get('email', ''), - } - - for field, val in [('start', ev['start']), ('end', ev['end'])]: - if 'date' in val: - rec['%sTime' % field] = parse(val['date']).replace(tzinfo=tzlocal()) - rec['%sDate' % field] = val['date'] - else: - rec['%sTime' % field] = parse(val['dateTime']) - rec['%sDate' % field] = parse(val['dateTime']).date().isoformat() - return cast(Record, rec) - - -def filterStarred(recs: Sequence[Record], maxCount=15) -> List[Record]: - recs = sorted(recs, key=lambda r: r['start']) - out = [] - for rec in recs: - if m:=re.search(r'(.*)\*\s*$', rec['title']): - rec = rec.copy() - rec['title'] = m.group(1) - out.append(rec) - if len(out) >= maxCount: - break - return out - - -class SyncToMongo(object): - """reads gcal, writes to mongodb and graphs""" - collection: pymongo.collection.Collection - - def __init__(self, conf: Conf, collection: pymongo.collection.Collection, agendaGraph: PatchableGraph, countdownGraph: PatchableGraph): - self.conf = conf - self.service = getCalendarService() - self.collection = collection - self.calendarsCollection = collection.database.get_collection('gcalendar_cals') - self.agendaGraph = agendaGraph - self.countdownGraph = countdownGraph - - def update(self, days=10, cal=None) -> int: - start, end = dayRange(days) - idsFormerlyInRange = self._clearByStartTime(cal, start, end) - - totalNew, currentRecords = self._gatherNewEventsInRange(cal, start, end, idsFormerlyInRange) - - self.updateGraphs(currentRecords) - return totalNew - - def _gatherNewEventsInRange(self, cal, start, end, idsFormerlyInRange): - totalNew = 0 - currentRecords = [] - try: - cals = _getFirstPageOfCalendars(self.service) - except HttpError: - log.error('on getFirstPageOfCalendars') - os.abort() - for calId, summary, description in cals: - self.calendarsCollection.update_one({'_id': calId}, {'$set': { - 'summary': summary, - 'description': description, - }}, upsert=True) - if cal and calId != cal: - continue - try: - self._updateOneCal(start, end, idsFormerlyInRange, totalNew, currentRecords, calId) - except HttpError: - log.error(f"on cal {calId}") - return totalNew, currentRecords - - def _clearByStartTime(self, cal, start, end): - spec: Dict[str, Any] = {"startTime": {"$gte": start, "$lte": end}} - if cal is not None: - spec['feed'] = feedFromCalId(self.conf, cal) - idsFormerlyInRange = [doc['_id'] for doc in self.collection.find(spec)] - n = self.collection.delete_many(spec) - log.info(f'cleared {n} records before reread') - return idsFormerlyInRange - - def _updateOneCal(self, start, end, idsFormerlyInRange, totalNew, currentRecords, calId): - print('read %s' % calId) - events = self.service.events().list( - calendarId=calId, - singleEvents=True, - timeMin=start.isoformat(), - timeMax=end.isoformat(), - showDeleted=False, - maxResults=1000, - ).execute() - - for ev in events['items']: - rec = _recordFromEv(self.conf, calId, ev) - self._upsertMongo(rec) - if rec['uri'] not in idsFormerlyInRange: - totalNew += 1 - currentRecords.append(rec) - - def _upsertMongo(self, rec: Record) -> List[Record]: - if self.collection.find_one({"_id": rec['uri']}) is not None: - log.debug("existing record %s", rec['uri']) - # this is not yet noticing updates - return [] - else: - log.debug("add record %s", rec) - d = cast(Dict[str, Any], rec.copy()) - d['_id'] = d.pop('uri') - self.collection.insert_one(d) - return [rec] - - def updateGraphs(self, currentRecords: Iterable[Record]): - currentRecords = list(currentRecords) - cals = list(self.calendarsCollection.find()) - self.agendaGraph.setToGraph(asGraph(self.conf, cals, limitDays(currentRecords, days=2))) - self.countdownGraph.setToGraph(asGraph(self.conf, cals, filterStarred(currentRecords, maxCount=15), extraClasses=[EV['CountdownEvent']]))
--- a/localtypes.py Sat Sep 07 16:13:55 2024 -0700 +++ b/localtypes.py Sat Sep 07 16:14:45 2024 -0700 @@ -6,18 +6,15 @@ host: str port: int database: str - collection: str class Conf(TypedDict): - event_uri_ns: str - minutes_between_polls: float mongo: MongoConf class Record(TypedDict): - uri: str - feed: str + _id: str # uri + calendarUrl: str title: str start: int startTime: datetime.datetime @@ -28,7 +25,3 @@ endTimeUnspecified: bool htmlLink: str creatorEmail: str - - -def feedFromCalId(conf: Conf, calId: str) -> str: - return conf['event_uri_ns'] + 'feed/' + calId
--- a/login.py Sat Sep 07 16:13:55 2024 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,10 +0,0 @@ -"""run this to update auth files, then rebuild docker""" - -import logging - -from calendar_connection import getCalendarService - -logging.basicConfig(level=logging.DEBUG) -log = logging.getLogger() - -log.info(f'res={getCalendarService()}')