changeset 85:f75b3a109b66

rewrite gcalendarwatch to read from calsync's mongo data only, not google services
author drewp@bigasterisk.com
date Sat, 07 Sep 2024 16:14:45 -0700
parents 5f7ae444ecae
children c2578b4a8d9d
files Dockerfile gcalendarwatch.py graphconvert.py index.html ingest.py localtypes.py login.py
diffstat 7 files changed, 71 insertions(+), 272 deletions(-) [+]
line wrap: on
line diff
--- a/Dockerfile	Sat Sep 07 16:13:55 2024 -0700
+++ b/Dockerfile	Sat Sep 07 16:14:45 2024 -0700
@@ -5,5 +5,4 @@
 COPY pyproject.toml pdm.lock ./
 RUN pdm install
 
-COPY *.py *.conf *.html *.json ./
-COPY credentials credentials
+COPY *.py *.conf *.html ./
--- a/gcalendarwatch.py	Sat Sep 07 16:13:55 2024 -0700
+++ b/gcalendarwatch.py	Sat Sep 07 16:14:45 2024 -0700
@@ -1,40 +1,33 @@
 """
-sync google calendar into mongodb, return a few preset queries as RDF graphs
+read calendar from mongodb, return a few preset queries as RDF graphs
 
 gcalendarwatch.conf looks like this:
 {
-  "minutes_between_polls" : 60
-  "mongo" : {"host" : "h1", "port" : 27017, "database" : "dbname", "collection" : "pim"},
+  "mongo" : {"host" : "h1", "port" : 27017, "database" : "dbname"},
 }
-
-This should be updated to use
-http://googledevelopers.blogspot.com/2013/07/google-calendar-api-push-notifications.html
-and update faster with less polling
 """
 import datetime
-import functools
 import json
 import logging
-import time
-import traceback
-from typing import Iterable, cast
+import re
+from typing import Sequence, cast
 
 import background_loop
 import pymongo.collection
+import pymongo.database
 from dateutil.tz import tzlocal
 from patchablegraph import PatchableGraph
 from patchablegraph.handler import GraphEvents, StaticGraph
 from pymongo import MongoClient
-from rdflib import Namespace, URIRef
+from rdflib import Namespace
 from starlette.applications import Starlette
 from starlette.requests import Request
-from starlette.responses import HTMLResponse, PlainTextResponse
+from starlette.responses import HTMLResponse
 from starlette.routing import Route
 from starlette_exporter import PrometheusMiddleware, handle_metrics
 
-from datetimemath import dayRange
+from datetimemath import dayRange, limitDays
 from graphconvert import asGraph
-from ingest import SyncToMongo
 from localtypes import Conf, Record
 
 logging.basicConfig(level=logging.INFO)
@@ -63,87 +56,67 @@
 }"""
 
 
-class ReadMongoEvents(object):
-    """read events from mongodb"""
-
-    def __init__(self, collection: pymongo.collection.Collection):
-        self.collection = collection
-
-    def getEvents(self, t1: datetime.datetime, t2: datetime.datetime) -> Iterable[Record]:
-        if t1.tzinfo is None or t2.tzinfo is None:
-            raise TypeError("tz-naive datetimes")
-        for doc in self.collection.find({"startTime": {"$gte": t1, "$lt": t2}}).sort([("startTime", 1)]):
-            doc['uri'] = doc.pop('_id')
-            if 'feedId' in doc:
-                doc['feed'] = URIRef('old_event')
-            yield doc
+def filterStarred(recs: Sequence[Record], maxCount=15) -> list[Record]:
+    recs = sorted(recs, key=lambda r: r['start'])
+    out = []
+    for rec in recs:
+        if m := re.search(r'(.*)\*\s*$', rec['title']):
+            rec = rec.copy()
+            rec['title'] = m.group(1)
+            out.append(rec)
+            if len(out) >= maxCount:
+                break
+    return out
 
 
-def update(
-        # this is incompletely type-checked:
-        sync: SyncToMongo,  # curried by main
-        first_run: bool,  # passed by background_loop
-        cal=None  # sometimes passed by us
-) -> int:
-    log.info(f"updating {cal or 'all'}")
-    try:
-        n = sync.update(cal=cal, days=120)
-    except Exception:
-        traceback.print_exc()
-        log.error("update failed")
-        n = 0
-    return n
+class SyncGraphsToMongo(object):
+    """reads mongodb (that calsync wrote); edits graphs"""
+    calendarsCollection: pymongo.collection.Collection
+    eventsCollection: pymongo.collection.Collection
 
+    def __init__(self, conf: Conf, db: pymongo.database.Database, agendaGraph: PatchableGraph, countdownGraph: PatchableGraph,
+                 currentEventsGraph: PatchableGraph):
+        self.conf = conf
+        self.eventsCollection = db.get_collection('test_gcalendar')
+        self.calendarsCollection = db.get_collection('test_gcalendar_cals')
+
+        self.agendaGraph = agendaGraph
+        self.countdownGraph = countdownGraph
+        self.currentEventsGraph = currentEventsGraph
 
-def statusMsg(conf, loop: background_loop.Loop):
-    period = conf['minutes_between_polls'] * 60
-    ago = time.time() - loop.lastSuccessRun
-    if not loop.everSucceeded:
-        msg = "no completed updates %d sec after startup" % ago
-        if ago > period * 1.1:
-            raise ValueError(msg)
-    else:
-        msg = "last update was %d sec ago" % ago
-        if ago > period * 1.1:
-            raise ValueError(msg)
-    return msg
-
+    def _getEvents(self, t1: datetime.datetime, t2: datetime.datetime) -> list[Record]:
+        if t1.tzinfo is None or t2.tzinfo is None:
+            raise TypeError("tz-naive datetimes")
+        return list(self.eventsCollection.find({"startTime": {"$gte": t1, "$lt": t2}}).sort([("startTime", 1)]))
 
-async def PollNow(loop: background_loop.Loop, req: Request) -> PlainTextResponse:
-    body = await req.body()
-    cals = json.loads(body).get('cals', None) if body else [None]
-    n = 0
-    for cal in cals:
-        n += await loop.runNow(cal=cal)
-    msg = f"found {n} new records"
-    log.info(msg)
-    return PlainTextResponse(msg)
+    def updateGraphs(self, first_run):
+        s, e = dayRange(120)
+        currentRecords = self._getEvents(s, e)
+        cals = list(self.calendarsCollection.find())
+        self.agendaGraph.setToGraph(asGraph(self.conf, cals, limitDays(currentRecords, days=2)))
+        self.countdownGraph.setToGraph(asGraph(self.conf, cals, filterStarred(currentRecords, maxCount=15), extraClasses=[EV['CountdownEvent']]))
 
-
-def updateCurrentEvents(conf: Conf, currentEventsGraph: PatchableGraph, collection: pymongo.collection.Collection, first_run: bool):
-    now = datetime.datetime.now(tzlocal())
-    events = list(collection.find({"startTime": {"$lte": now}, "endTime": {"$gte": now}}))
-    currentEventsGraph.setToGraph(asGraph(conf, cals=[], events=events, extraClasses=[EV['CurrentEvent']]))
+        now = datetime.datetime.now(tzlocal())
+        events = list(self.eventsCollection.find({"startTime": {"$lte": now}, "endTime": {"$gte": now}}))
+        self.currentEventsGraph.setToGraph(asGraph(self.conf, cals=[], events=events, extraClasses=[EV['CurrentEvent']]))
 
 
 def main():
-    agendaGraph = PatchableGraph()  # next few days
-    countdownGraph = PatchableGraph()  # next n of starred events
-    currentEventsGraph = PatchableGraph()  # events happening now
+    agendaGraph = PatchableGraph(label='agenda')  # next few days
+    countdownGraph = PatchableGraph(label='countdown')  # next n of starred events
+    currentEventsGraph = PatchableGraph(label='currentEvents')  # events happening now
+
     conf = cast(Conf, json.load(open("gcalendarwatch.conf")))
     m = conf['mongo']
-    mongoOut = MongoClient(m['host'], m['port'], tz_aware=True)[m['database']][m['collection']]
-    sync = SyncToMongo(conf, mongoOut, agendaGraph, countdownGraph)
-    read = ReadMongoEvents(mongoOut)
+    db = MongoClient(m['host'], m['port'], tz_aware=True)[m['database']]
+    sync = SyncGraphsToMongo(conf, db, agendaGraph, countdownGraph, currentEventsGraph)
 
-    s, e = dayRange(120)
-    sync.updateGraphs(read.getEvents(s, e))
-
-    loop = background_loop.loop_forever(functools.partial(update, sync=sync), conf['minutes_between_polls'] * 60)
-    background_loop.loop_forever(functools.partial(updateCurrentEvents, conf, currentEventsGraph, mongoOut), 5, metric_prefix="current_events")
+    # todo: this should watch for mongodb edits, or get a signal from calsync
+    background_loop.loop_forever(sync.updateGraphs, 5, metric_prefix="update_graphs")
 
     def getRoot(request: Request) -> HTMLResponse:
-        return HTMLResponse(content=open("index.html").read().replace("MSG", statusMsg(conf, loop)))
+        return HTMLResponse(content=open("index.html").read())
+
     moreNs = {
         "": "http://bigasterisk.com/event#",
         "cal": "http://bigasterisk.com/calendar/",
@@ -159,7 +132,6 @@
                         Route('/graph/calendar/countdown/events', GraphEvents(countdownGraph)),
                         Route('/graph/currentEvents', StaticGraph(currentEventsGraph, moreNs)),
                         Route('/graph/currentEvents/events', GraphEvents(currentEventsGraph)),
-                        Route('/pollNow', functools.partial(PollNow, loop), methods=['POST'])
                     ])
 
     app.add_middleware(PrometheusMiddleware, group_paths=True, filter_unhandled_paths=True, app_name='gcalendarwatch')
--- a/graphconvert.py	Sat Sep 07 16:13:55 2024 -0700
+++ b/graphconvert.py	Sat Sep 07 16:14:45 2024 -0700
@@ -1,28 +1,30 @@
+from pprint import pformat, pprint
 from dateutil.tz import tzlocal
 from rdflib import RDF, RDFS, ConjunctiveGraph, Literal, Namespace, URIRef
+import logging
+from localtypes import Conf, Record
 
-from localtypes import Conf, feedFromCalId
-
+log = logging.getLogger('conv')
 EV = Namespace("http://bigasterisk.com/event#")
 
 
-def asGraph(conf: Conf, cals, events, extraClasses=[], ctxUri: URIRef = EV['gcalendar']):
+def asGraph(conf: Conf, cals: list, events: list[Record], extraClasses=[], ctxUri: URIRef = EV['gcalendar']):
     ctx = ConjunctiveGraph(identifier=ctxUri)
     graph = ConjunctiveGraph()
     graph.namespace_manager.bind('ev', EV)
 
     for doc in cals:
-        feed = URIRef(feedFromCalId(conf, doc['_id']))
-        graph.add((feed, RDF.type, EV['CalendarFeed'], ctx))
+        cal = URIRef(doc['_id'])
+        graph.add((cal, RDF.type, EV['CalendarFeed'], ctx))
         if doc['summary']:
-            graph.add((feed, RDFS.label, Literal(doc['summary'].strip()), ctx))
+            graph.add((cal, RDFS.label, Literal(doc['summary'].strip()), ctx))
         if doc['description']:
-            graph.add((feed, EV['description'], Literal(doc['description'].strip()), ctx))
+            graph.add((cal, EV['description'], Literal(doc['description'].strip()), ctx))
 
     for ev in events:
-        uri = URIRef(ev.get('uri', ev.get('_id')))
-        if uri is None:
-            raise ValueError(f"{ev=} had no event uri")
+        uri = URIRef(ev['_id'])
+
+        graph.add((URIRef(ev['calendarUrl']), EV['event'], uri, ctx))
 
         def add(p: URIRef, o: URIRef | Literal):
             return graph.add((uri, p, o, ctx))
@@ -34,9 +36,7 @@
         add(EV['start'], Literal(ev['start']))
         add(EV['startDate'], Literal(ev['startDate']))
         add(EV['end'], Literal(ev['end']))
-        add(EV['feed'], URIRef(ev['feed']))
-        # graph.add((feed, RDFS.label, Literal(ev['feedTitle'])))
+        add(EV['feed'], URIRef(ev['calendarUrl']))
         if 'htmlLink' in ev:
             add(EV['htmlLink'], URIRef(ev['htmlLink']))
     return graph
-
--- a/index.html	Sat Sep 07 16:13:55 2024 -0700
+++ b/index.html	Sat Sep 07 16:14:45 2024 -0700
@@ -22,8 +22,6 @@
 <div><a href="graph/calendar/countdown/events">graph/calendar/countdown/events</a></div>
 <div><a href="graph/currentEvents">graph/currentEvents</a></div>
 <div><a href="graph/currentEvents/events">graph/currentEvents/events</a></div>
-<div><a href="countdowns.json">countdowns.json</a></div>
-<div><a href="events">events</a></div>
 <div><a href="metrics">metrics</a></div>
 
 	<script type="text/javascript" src="https://bigasterisk.com/lib/jquery-2.0.3.min.js"></script>
--- a/ingest.py	Sat Sep 07 16:13:55 2024 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,153 +0,0 @@
-import logging
-import os
-import re
-from typing import Any, Dict, Iterable, List, Sequence, cast
-
-import pymongo.collection
-from dateutil.tz import tzlocal
-from googleapiclient.discovery import Resource
-from googleapiclient.errors import HttpError
-from patchablegraph import PatchableGraph
-from rdflib import Namespace
-
-from calendar_connection import getCalendarService
-from datetimemath import dayRange, limitDays, parse
-from graphconvert import asGraph
-from localtypes import Conf, Record, feedFromCalId
-
-log = logging.getLogger()
-EV = Namespace("http://bigasterisk.com/event#")
-
-
-def _getFirstPageOfCalendars(service: Resource) -> Iterable[tuple[str, str | None, str | None]]:
-    for row in service.calendarList().list().execute()['items']:
-        yield row['id'], row.get('summary'), row.get('description')
-
-
-def _recordFromEv(conf: Conf, calId: str, ev: Dict) -> Record:
-
-    def dateOrTime(d):
-        if 'date' in d:
-            return d['date']
-        return d['dateTime']
-
-    rec = {
-        'uri': conf['event_uri_ns'] + ev['id'],
-        'feed': feedFromCalId(conf, calId),
-        'title': ev.get('summary', '?'),
-        'start': dateOrTime(ev['start']),
-        'end': dateOrTime(ev['end']),
-        'endTimeUnspecified': ev.get('endTimeUnspecified', False),
-        'htmlLink': ev.get('htmlLink', ''),
-        'creatorEmail': ev.get('creator', {}).get('email', ''),
-    }
-
-    for field, val in [('start', ev['start']), ('end', ev['end'])]:
-        if 'date' in val:
-            rec['%sTime' % field] = parse(val['date']).replace(tzinfo=tzlocal())
-            rec['%sDate' % field] = val['date']
-        else:
-            rec['%sTime' % field] = parse(val['dateTime'])
-            rec['%sDate' % field] = parse(val['dateTime']).date().isoformat()
-    return cast(Record, rec)
-
-
-def filterStarred(recs: Sequence[Record], maxCount=15) -> List[Record]:
-    recs = sorted(recs, key=lambda r: r['start'])
-    out = []
-    for rec in recs:
-        if m:=re.search(r'(.*)\*\s*$', rec['title']):
-            rec = rec.copy()
-            rec['title'] = m.group(1)
-            out.append(rec)
-            if len(out) >= maxCount:
-                break
-    return out
-
-
-class SyncToMongo(object):
-    """reads gcal, writes to mongodb and graphs"""
-    collection: pymongo.collection.Collection
-
-    def __init__(self, conf: Conf, collection: pymongo.collection.Collection, agendaGraph: PatchableGraph, countdownGraph: PatchableGraph):
-        self.conf = conf
-        self.service = getCalendarService()
-        self.collection = collection
-        self.calendarsCollection = collection.database.get_collection('gcalendar_cals')
-        self.agendaGraph = agendaGraph
-        self.countdownGraph = countdownGraph
-
-    def update(self, days=10, cal=None) -> int:
-        start, end = dayRange(days)
-        idsFormerlyInRange = self._clearByStartTime(cal, start, end)
-
-        totalNew, currentRecords = self._gatherNewEventsInRange(cal, start, end, idsFormerlyInRange)
-
-        self.updateGraphs(currentRecords)
-        return totalNew
-
-    def _gatherNewEventsInRange(self, cal, start, end, idsFormerlyInRange):
-        totalNew = 0
-        currentRecords = []
-        try:
-            cals = _getFirstPageOfCalendars(self.service)
-        except HttpError:
-            log.error('on getFirstPageOfCalendars')
-            os.abort()
-        for calId, summary, description in cals:
-            self.calendarsCollection.update_one({'_id': calId}, {'$set': {
-                'summary': summary,
-                'description': description,
-            }}, upsert=True)
-            if cal and calId != cal:
-                continue
-            try:
-                self._updateOneCal(start, end, idsFormerlyInRange, totalNew, currentRecords, calId)
-            except HttpError:
-                log.error(f"on cal {calId}")
-        return totalNew, currentRecords
-
-    def _clearByStartTime(self, cal, start, end):
-        spec: Dict[str, Any] = {"startTime": {"$gte": start, "$lte": end}}
-        if cal is not None:
-            spec['feed'] = feedFromCalId(self.conf, cal)
-        idsFormerlyInRange = [doc['_id'] for doc in self.collection.find(spec)]
-        n = self.collection.delete_many(spec)
-        log.info(f'cleared {n} records before reread')
-        return idsFormerlyInRange
-
-    def _updateOneCal(self, start, end, idsFormerlyInRange, totalNew, currentRecords, calId):
-        print('read %s' % calId)
-        events = self.service.events().list(
-            calendarId=calId,
-            singleEvents=True,
-            timeMin=start.isoformat(),
-            timeMax=end.isoformat(),
-            showDeleted=False,
-            maxResults=1000,
-        ).execute()
-
-        for ev in events['items']:
-            rec = _recordFromEv(self.conf, calId, ev)
-            self._upsertMongo(rec)
-            if rec['uri'] not in idsFormerlyInRange:
-                totalNew += 1
-            currentRecords.append(rec)
-
-    def _upsertMongo(self, rec: Record) -> List[Record]:
-        if self.collection.find_one({"_id": rec['uri']}) is not None:
-            log.debug("existing record %s", rec['uri'])
-            # this is not yet noticing updates
-            return []
-        else:
-            log.debug("add record %s", rec)
-            d = cast(Dict[str, Any], rec.copy())
-            d['_id'] = d.pop('uri')
-            self.collection.insert_one(d)
-            return [rec]
-
-    def updateGraphs(self, currentRecords: Iterable[Record]):
-        currentRecords = list(currentRecords)
-        cals = list(self.calendarsCollection.find())
-        self.agendaGraph.setToGraph(asGraph(self.conf, cals, limitDays(currentRecords, days=2)))
-        self.countdownGraph.setToGraph(asGraph(self.conf, cals, filterStarred(currentRecords, maxCount=15), extraClasses=[EV['CountdownEvent']]))
--- a/localtypes.py	Sat Sep 07 16:13:55 2024 -0700
+++ b/localtypes.py	Sat Sep 07 16:14:45 2024 -0700
@@ -6,18 +6,15 @@
     host: str
     port: int
     database: str
-    collection: str
 
 
 class Conf(TypedDict):
-    event_uri_ns: str
-    minutes_between_polls: float
     mongo: MongoConf
 
 
 class Record(TypedDict):
-    uri: str
-    feed: str
+    _id: str  # uri
+    calendarUrl: str
     title: str
     start: int
     startTime: datetime.datetime
@@ -28,7 +25,3 @@
     endTimeUnspecified: bool
     htmlLink: str
     creatorEmail: str
-
-
-def feedFromCalId(conf: Conf, calId: str) -> str:
-    return conf['event_uri_ns'] + 'feed/' + calId
--- a/login.py	Sat Sep 07 16:13:55 2024 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,10 +0,0 @@
-"""run this to update auth files, then rebuild docker"""
-
-import logging
-
-from calendar_connection import getCalendarService
-
-logging.basicConfig(level=logging.DEBUG)
-log = logging.getLogger()
-
-log.info(f'res={getCalendarService()}')