changeset 28:e2209226b001

rewrite with starlette and background_loop
author drewp@bigasterisk.com
date Sun, 24 Jul 2022 00:58:54 -0700
parents 87ab41b8ed52
children 32fd3bd77ff2
files calendar_connection.py datetimemath.py gcalendarwatch.py graphconvert.py ingest.py localtypes.py
diffstat 6 files changed, 317 insertions(+), 332 deletions(-) [+]
line wrap: on
line diff
--- a/calendar_connection.py	Sun Jul 24 00:58:07 2022 -0700
+++ b/calendar_connection.py	Sun Jul 24 00:58:54 2022 -0700
@@ -3,12 +3,12 @@
 
 from google.auth.transport.requests import Request
 from google_auth_oauthlib.flow import InstalledAppFlow
-from googleapiclient.discovery import build
+from googleapiclient.discovery import build, Resource
 
 SCOPES = ['https://www.googleapis.com/auth/calendar.readonly']
 
 
-def getCalendarService(scope='https://www.googleapis.com/auth/calendar.readonly'):
+def getCalendarService(scope='https://www.googleapis.com/auth/calendar.readonly') -> Resource:
     """
     """
     creds = None
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/datetimemath.py	Sun Jul 24 00:58:54 2022 -0700
@@ -0,0 +1,26 @@
+import datetime
+from typing import Iterable, List, Tuple, cast
+
+import dateutil.parser
+from dateutil.tz.tz import tzlocal
+
+from localtypes import Record
+
+
+def dayRange(days: float) -> Tuple[datetime.datetime, datetime.datetime]:
+    now = datetime.datetime.now(tzlocal())
+    start = now - datetime.timedelta(hours=12)
+    end = now + datetime.timedelta(days=days)
+    return start, end
+
+
+def limitDays(recs: Iterable[Record], days: float) -> List[Record]:
+    start, end = dayRange(days)
+    start = start - datetime.timedelta(hours=12)
+    # incomplete
+    return [r for r in recs if r['startTime'] < end and r['endTime'] > start]
+
+
+# just a typing fix
+def parse(s: str, **kw) -> datetime.datetime:
+    return cast(datetime.datetime, dateutil.parser.parse(s, **kw))
--- a/gcalendarwatch.py	Sun Jul 24 00:58:07 2022 -0700
+++ b/gcalendarwatch.py	Sun Jul 24 00:58:54 2022 -0700
@@ -13,33 +13,33 @@
 and update faster with less polling
 """
 import datetime
+import functools
 import json
+import logging
 import re
 import time
 import traceback
+from typing import Any, Dict, Iterable, Optional, cast
 
-import cyclone.web
-import pymongo.collection
-from dateutil.parser import parse
 from dateutil.tz import tzlocal
-from patchablegraph import (
-    CycloneGraphEventsHandler,
-    CycloneGraphHandler,
-    PatchableGraph,
-)
-from calendar_connection import getCalendarService
-from typing import Dict, Any
+from patchablegraph import PatchableGraph
+from patchablegraph.handler import GraphEvents, StaticGraph
 from pymongo import MongoClient
-from rdflib import Graph, Literal, Namespace, RDF, URIRef
-from standardservice.logsetup import log, verboseLogging
-from twisted.internet import reactor
+from rdflib import RDF, Graph, Namespace, URIRef
+from starlette.applications import Starlette
+from starlette.requests import Request
+from starlette.responses import (HTMLResponse, JSONResponse, PlainTextResponse, Response)
+from starlette.routing import Route
+from starlette_exporter import PrometheusMiddleware, handle_metrics
 
-import docopt
-from prometheus_client import Summary
-from prometheus_client.exposition import generate_latest
-from prometheus_client.registry import REGISTRY
+import background_loop_local as background_loop
+from datetimemath import dayRange, parse
+from graphconvert import asGraph
+from ingest import SyncToMongo
+from localtypes import Conf, Record
 
-UPDATE = Summary('gcalendarwatch_updates', 'update loop calls')
+logging.basicConfig(level=logging.INFO)
+log = logging.getLogger()
 
 EV = Namespace("http://bigasterisk.com/event#")
 """
@@ -64,38 +64,6 @@
 }"""
 
 
-def feedFromCalId(conf, calId):
-    return conf['event_uri_ns'] + 'feed/' + calId
-
-
-def recordFromEv(conf: Dict, calId: str, ev: Dict):
-
-    def dateOrTime(d):
-        if 'date' in d:
-            return d['date']
-        return d['dateTime']
-
-    rec = {
-        'uri': conf['event_uri_ns'] + ev['id'],
-        'feed': feedFromCalId(conf, calId),
-        'title': ev.get('summary', '?'),
-        'start': dateOrTime(ev['start']),
-        'end': dateOrTime(ev['end']),
-        'endTimeUnspecified': ev.get('endTimeUnspecified', False),
-        'htmlLink': ev.get('htmlLink', ''),
-        'creatorEmail': ev.get('creator', {}).get('email', ''),
-    }
-
-    for field, val in [('start', ev['start']), ('end', ev['end'])]:
-        if 'date' in val:
-            rec['%sTime' % field] = parse(val['date']).replace(tzinfo=tzlocal())
-            rec['%sDate' % field] = val['date']
-        else:
-            rec['%sTime' % field] = parse(val['dateTime'])
-            rec['%sDate' % field] = parse(val['dateTime']).date().isoformat()
-    return rec
-
-
 def asJsonLd(events):
     ret = {'@graph': []}  # type: Dict[Any, Any]
     for ev in events:
@@ -136,49 +104,7 @@
     return ret
 
 
-def asGraph(events, extraClasses=[]):
-    graph = Graph()
-    graph.namespace_manager.bind('ev', EV)
-    for ev in events:
-        uri = URIRef(ev['uri'])
-
-        def add(p, o):
-            return graph.add((uri, p, o))
-
-        add(RDF.type, EV['Event'])
-        for cls in extraClasses:
-            add(RDF.type, cls)
-        add(EV['title'], Literal(ev['title']))
-        add(EV['start'], Literal(ev['start']))
-        add(EV['startDate'], Literal(ev['startDate']))
-        add(EV['end'], Literal(ev['end']))
-        add(EV['feed'], URIRef(ev['feed']))
-        # graph.add((feed, RDFS.label, Literal(ev['feedTitle'])))
-        if 'htmlLink' in ev:
-            add(EV['htmlLink'], URIRef(ev['htmlLink']))
-    return graph
-
-
-def getFirstPageOfCalendars(service):
-    for row in service.calendarList().list().execute()['items']:
-        yield row['id']
-
-
-def dayRange(days):
-    now = datetime.datetime.now(tzlocal())
-    start = now - datetime.timedelta(hours=12)
-    end = now + datetime.timedelta(days=days)
-    return start, end
-
-
-def limitDays(recs, days):
-    start, end = dayRange(days)
-    start = start - datetime.timedelta(hours=12)
-    # incomplete
-    return [r for r in recs if r['startTime'] < end and r['endTime'] > start]
-
-
-def starred(graph, ev):
+def starred(graph: Graph, ev: URIRef) -> Optional[str]:
     title = graph.value(ev, EV['title'])
     m = re.search(r'(.*)\*\s*$', title)
     if m:
@@ -187,89 +113,13 @@
         return None
 
 
-def filterStarred(recs, maxCount=15):
-    recs = sorted(recs, key=lambda r: r['start'])
-    out = []
-    for rec in recs:
-        if re.search(r'(.*)\*\s*$', rec['title']):
-            out.append(rec)
-            if len(out) >= maxCount:
-                break
-    return out
-
-
-class SyncToMongo(object):
-    """reads gcal, writes to mongodb"""
-    collection: pymongo.collection.Collection
-
-    def __init__(self, conf, collection: pymongo.collection.Collection, agendaGraph: PatchableGraph, countdownGraph: PatchableGraph):
-        self.conf = conf
-        self.service = getCalendarService()
-        self.collection = collection
-        self.agendaGraph = agendaGraph
-        self.countdownGraph = countdownGraph
-
-    @UPDATE.time()
-    def update(self, days=10, cal=None) -> int:
-        start, end = dayRange(days)
-        spec = {"startTime": {"$gte": start, "$lte": end}}
-        if cal is not None:
-            spec['feed'] = feedFromCalId(self.conf, cal)
-        idsFormerlyInRange = [doc['_id'] for doc in self.collection.find(spec)]
-        n = self.collection.delete_many(spec)
-        log.info(f'cleared {n} records before reread')
-
-        totalNew = 0
-        currentRecords = []
-        for calId in getFirstPageOfCalendars(self.service):
-            if cal and calId != cal:
-                continue
-            print('read %s' % calId)
-            events = self.service.events().list(
-                calendarId=calId,
-                singleEvents=True,
-                timeMin=start.isoformat(),
-                timeMax=end.isoformat(),
-                showDeleted=False,
-                maxResults=1000,
-            ).execute()
-
-            for ev in events['items']:
-                rec = recordFromEv(self.conf, calId, ev)
-                self.upsertMongo(rec)
-                if rec['uri'] not in idsFormerlyInRange:
-                    totalNew += 1
-                currentRecords.append(rec)
-
-        self.updateGraphs(currentRecords)
-        return totalNew
-
-    def upsertMongo(self, rec):
-        if self.collection.find_one({"_id": rec['uri']}) is not None:
-            log.debug("existing record %s", rec['uri'])
-            # this is not yet noticing updates
-            return []
-        else:
-            log.debug("add record %s", rec)
-            d = rec.copy()
-            d['_id'] = d.pop('uri')
-            self.collection.insert_one(d)
-            return [rec]
-
-    def updateGraphs(self, currentRecords):
-        c = EV['gcalendar']
-        currentRecords = list(currentRecords)
-        self.agendaGraph.setToGraph([(s, p, o, c) for s, p, o in asGraph(limitDays(currentRecords, days=2))])
-        self.countdownGraph.setToGraph([(s, p, o, c) for s, p, o in asGraph(filterStarred(currentRecords, maxCount=15), extraClasses=[EV['CountdownEvent']])])
-
-
 class ReadMongoEvents(object):
     """read events from mongodb"""
 
     def __init__(self, collection):
         self.collection = collection
 
-    def getEvents(self, t1, t2):
+    def getEvents(self, t1: datetime.datetime, t2: datetime.datetime) -> Iterable[Record]:
         if t1.tzinfo is None or t2.tzinfo is None:
             raise TypeError("tz-naive datetimes")
         for doc in self.collection.find({"startTime": {"$gte": t1, "$lt": t2}}).sort([("startTime", 1)]):
@@ -279,142 +129,85 @@
             yield doc
 
 
-class Poller(object):
-
-    def __init__(self, sync, periodSec):
-        self.sync = sync
-        self.lastUpdateTime = time.time()
-        self.everUpdated = False
-        self.periodSec = periodSec
-        self.scheduled = reactor.callLater(self.periodSec, self._updateLoop)
-        self.events = None
-
-    def updateNow(self, cal=None) -> int:
-        self.scheduled.cancel()
-        return self._updateLoop(cal)
-
-    def _updateLoop(self, cal=None) -> int:
-        log.info(f"updating {cal or 'all'}")
-        t1 = time.time()
-        try:
-            n = self.sync.update(cal=cal)
-        except Exception:
-            traceback.print_exc()
-            log.error("updated failed")
-            n = 0
-        self.lastUpdateTime = t1
-        self.everUpdated = True
-        took = time.time() - t1
-        self.scheduled = reactor.callLater(max(3, self.periodSec - took), self._updateLoop)
-        return n
+def update(
+        # this is incompletely type-checked:
+        sync: SyncToMongo,  # curried by main
+        first_run: bool,  # passed by background_loop
+        cal=None  # sometimes passed by us
+) -> int:
+    log.info(f"updating {cal or 'all'}")
+    try:
+        n = sync.update(cal=cal)
+    except Exception:
+        traceback.print_exc()
+        log.error("update failed")
+        n = 0
+    return n
 
 
-class PollNow(cyclone.web.RequestHandler):
-
-    def post(self):
-        cals = json.loads(self.request.body).get('cals', None) if self.request.body else None
-        n = 0
-        for cal in cals:
-            n += self.settings.poller.updateNow(cal)
-        msg = f"found {n} new records"
-        log.info(msg)
-        self.write(msg.encode('utf8'))
+# who uses this? pimscreen, frontdoor? they should use the GraphEvents version
+def EventsPage(read: ReadMongoEvents, req: Request) -> Response:
+    t1, t2 = dayRange(int(req.query_params.get('days', default='2')))
+    if req.query_params.get('t1', default=None):
+        t1 = parse(req.query_params.get('t1', ''), tzinfo=tzlocal())
+    if req.query_params.get('t2', default=None):
+        t2 = parse(req.query_params.get('t2', ''), tzinfo=tzlocal())
+    log.info(f'get /events local t1={t1} t2={t2}')
+    return PlainTextResponse(media_type="text/n3", content=asGraph(read.getEvents(t1, t2)).serialize(format='n3'))
 
 
-class Index(cyclone.web.RequestHandler):
-
-    def get(self):
-        period = self.settings.conf['minutes_between_polls'] * 60
-        ago = time.time() - self.settings.poller.lastUpdateTime
-        if not self.settings.poller.everUpdated:
-            msg = "no completed updates %d sec after startup" % ago
-            if ago > period * 1.1:
-                raise ValueError(msg)
-        else:
-            msg = "last update was %d sec ago" % ago
-            if ago > period * 1.1:
-                raise ValueError(msg)
-        self.set_header("content-type", "text/html")
-        self.write(open("index.html").read().replace("MSG", msg))
+def Countdowns(countdownGraph, req: Request) -> Response:
+    rows = []
+    graph = countdownGraph._graph
+    for ev in graph.subjects(RDF.type, EV['Event']):
+        starLabel = starred(graph, ev)
+        if starLabel is not None:
+            rows.append({'@type': 'countdown', 'time': graph.value(ev, EV['start']), 'label': starLabel})
+    return JSONResponse(media_type="application/ld+json",
+                        content={
+                            "@context": {
+                                "countdown": "http://bigasterisk.com/countdown#CountdownEvent",
+                                "label": "http://www.w3.org/2000/01/rdf-schema#label",
+                                "time": {
+                                    "@id": "http://bigasterisk.com/event#time",
+                                    "@type": "xsd:dateTime"
+                                },
+                                "xsd": "http://www.w3.org/2001/XMLSchema#",
+                                "rdfs": "http://www.w3.org/2000/01/rdf-schema#"
+                            },
+                            "@graph": rows,
+                        })
 
 
-class EventsPage(cyclone.web.RequestHandler):
-
-    def get(self):
-        """
-        upcoming events as JSON-LD
-        """
-        arg = self.get_argument
-        t1, t2 = dayRange(int(arg('days', default='2')))
-        if arg('t1', default=None):
-            t1 = parse(arg('t1'), tzinfo=tzlocal())
-        if arg('t2', default=None):
-            t2 = parse(arg('t2'), tzinfo=tzlocal())
-        log.info(f'get /events local t1={t1} t2={t2}')
-        if 0:
-            self.set_header("content-type", "application/ld+json")
-            self.write(asJsonLd(self.settings.read.getEvents(t1, t2)))
-        else:
-            self.set_header("content-type", "text/n3")
-            self.write(asGraph(self.settings.read.getEvents(t1, t2)).serialize(format='n3'))
+def statusMsg(conf, loop: background_loop.Loop):
+    period = conf['minutes_between_polls'] * 60
+    ago = time.time() - loop.lastSuccessRun
+    if not loop.everSucceeded:
+        msg = "no completed updates %d sec after startup" % ago
+        if ago > period * 1.1:
+            raise ValueError(msg)
+    else:
+        msg = "last update was %d sec ago" % ago
+        if ago > period * 1.1:
+            raise ValueError(msg)
+    return msg
 
 
-class Countdowns(cyclone.web.RequestHandler):
-
-    def get(self):
-        rows = []
-        graph = self.settings.countdownGraph._graph
-        for ev in graph.subjects(RDF.type, EV['Event']):
-            starLabel = starred(graph, ev)
-            if starLabel is not None:
-                rows.append({'@type': 'countdown', 'time': graph.value(ev, EV['start']), 'label': starLabel})
-        self.set_header("content-type", "application/ld+json")
-        self.write(
-            json.dumps({
-                "@context": {
-                    "countdown": "http://bigasterisk.com/countdown#CountdownEvent",
-                    "label": "http://www.w3.org/2000/01/rdf-schema#label",
-                    "time": {
-                        "@id": "http://bigasterisk.com/event#time",
-                        "@type": "xsd:dateTime"
-                    },
-                    "xsd": "http://www.w3.org/2001/XMLSchema#",
-                    "rdfs": "http://www.w3.org/2000/01/rdf-schema#"
-                },
-                "@graph": rows,
-            }))
-
-
-class Metrics(cyclone.web.RequestHandler):
-
-    def get(self):
-        self.add_header('content-type', 'text/plain')
-        self.write(generate_latest(REGISTRY))
+async def PollNow(loop: background_loop.Loop, req: Request) -> PlainTextResponse:
+    body = await req.body()
+    cals = json.loads(body).get('cals', None) if body else [None]
+    n = 0
+    for cal in cals:
+        n += await loop.runNow(cal=cal)
+    msg = f"found {n} new records"
+    log.info(msg)
+    return PlainTextResponse(msg)
 
 
 def main():
-    args = docopt.docopt('''
-Usage:
-  gcalendarwatch [options]
-
-Options:
-  -v, --verbose  more logging
-  --now          don't wait for first update
-''')
-
-    verboseLogging(args['--verbose'])
-
-    # fix for oauth2.googleapis.com resolving to the wrong thing and
-    # making a self-signed cert error
-    with open('/etc/resolv.conf', 'w') as resolv:
-        resolv.write('''
-nameserver 10.43.0.10
-search default.svc.cluster.local
-        ''')
-
     agendaGraph = PatchableGraph()  # next few days
     countdownGraph = PatchableGraph()  # next n of starred events
-    conf = json.load(open("gcalendarwatch.conf"))
+    conf = cast(Conf, json.load(open("gcalendarwatch.conf")))
     m = conf['mongo']
     mongoOut = MongoClient(m['host'], m['port'], tz_aware=True)[m['database']][m['collection']]
     sync = SyncToMongo(conf, mongoOut, agendaGraph, countdownGraph)
@@ -423,45 +216,26 @@
     s, e = dayRange(60)
     sync.updateGraphs(read.getEvents(s, e))
 
-    poller = Poller(sync, conf['minutes_between_polls'] * 60)
-    if args['--now']:
-        poller.updateNow()
+    loop = background_loop.loop_forever(functools.partial(update, sync=sync), conf['minutes_between_polls'] * 60)
 
-    class Application(cyclone.web.Application):
+    def getRoot(request: Request) -> HTMLResponse:
+        return HTMLResponse(content=open("index.html").read().replace("MSG", statusMsg(conf, loop)))
 
-        def __init__(self):
-            handlers = [
-                (r"/", Index),
-                (r'/events', EventsPage),
-                (r'/pollNow', PollNow),
-                (r'/graph/calendar/upcoming', CycloneGraphHandler, {
-                    'masterGraph': agendaGraph
-                }),
-                (r'/graph/calendar/upcoming/events', CycloneGraphEventsHandler, {
-                    'masterGraph': agendaGraph
-                }),
-                (r'/graph/calendar/countdown', CycloneGraphHandler, {
-                    'masterGraph': countdownGraph
-                }),
-                (r'/graph/calendar/countdown/events', CycloneGraphEventsHandler, {
-                    'masterGraph': countdownGraph
-                }),
-                (r'/countdowns.json', Countdowns),
-                (r'/metrics', Metrics),
-            ]
-            cyclone.web.Application.__init__(
-                self,
-                handlers,
-                conf=conf,
-                read=read,
-                poller=poller,
-                agendaGraph=agendaGraph,
-                countdownGraph=countdownGraph,
-            )
+    app = Starlette(debug=True,
+                    routes=[
+                        Route('/', getRoot),
+                        Route('/graph/calendar/upcoming', StaticGraph(agendaGraph)),
+                        Route('/graph/calendar/upcoming/events', GraphEvents(agendaGraph)),
+                        Route('/graph/calendar/countdown', StaticGraph(countdownGraph)),
+                        Route('/graph/calendar/countdown/events', GraphEvents(countdownGraph)),
+                        Route('/countdowns.json', functools.partial(Countdowns, countdownGraph)),
+                        Route('/events', functools.partial(EventsPage, read)),
+                        Route('/pollNow', functools.partial(PollNow, loop), methods=['POST'])
+                    ])
 
-    reactor.listenTCP(conf['serve_port'], Application())
-    reactor.run()
+    app.add_middleware(PrometheusMiddleware, app_name='gcalendarwatch')
+    app.add_route("/metrics", handle_metrics)
+    return app
 
 
-if __name__ == '__main__':
-    main()
+app = main()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/graphconvert.py	Sun Jul 24 00:58:54 2022 -0700
@@ -0,0 +1,26 @@
+from rdflib import RDF, ConjunctiveGraph, Literal, Namespace, URIRef
+
+EV = Namespace("http://bigasterisk.com/event#")
+
+
+def asGraph(events, extraClasses=[], ctx=EV['gcalendar']):
+    graph = ConjunctiveGraph()
+    graph.namespace_manager.bind('ev', EV)
+    for ev in events:
+        uri = URIRef(ev['uri'])
+
+        def add(p, o):
+            return graph.add((uri, p, o, ctx))
+
+        add(RDF.type, EV['Event'])
+        for cls in extraClasses:
+            add(RDF.type, cls)
+        add(EV['title'], Literal(ev['title']))
+        add(EV['start'], Literal(ev['start']))
+        add(EV['startDate'], Literal(ev['startDate']))
+        add(EV['end'], Literal(ev['end']))
+        add(EV['feed'], URIRef(ev['feed']))
+        # graph.add((feed, RDFS.label, Literal(ev['feedTitle'])))
+        if 'htmlLink' in ev:
+            add(EV['htmlLink'], URIRef(ev['htmlLink']))
+    return graph
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/ingest.py	Sun Jul 24 00:58:54 2022 -0700
@@ -0,0 +1,129 @@
+import logging
+import re
+from typing import Any, Dict, Iterable, List, Sequence, cast
+
+import pymongo.collection
+from dateutil.tz import tzlocal
+from googleapiclient.discovery import Resource
+from patchablegraph import PatchableGraph
+from rdflib import Namespace
+
+from calendar_connection import getCalendarService
+from datetimemath import dayRange, limitDays, parse
+from localtypes import Conf, Record
+from graphconvert import asGraph
+
+log = logging.getLogger()
+EV = Namespace("http://bigasterisk.com/event#")
+
+
+def feedFromCalId(conf: Conf, calId: str) -> str:
+    return conf['event_uri_ns'] + 'feed/' + calId
+
+
+def getFirstPageOfCalendars(service: Resource):
+    for row in service.calendarList().list().execute()['items']:
+        yield row['id']
+
+
+def recordFromEv(conf: Conf, calId: str, ev: Dict) -> Record:
+
+    def dateOrTime(d):
+        if 'date' in d:
+            return d['date']
+        return d['dateTime']
+
+    rec = {
+        'uri': conf['event_uri_ns'] + ev['id'],
+        'feed': feedFromCalId(conf, calId),
+        'title': ev.get('summary', '?'),
+        'start': dateOrTime(ev['start']),
+        'end': dateOrTime(ev['end']),
+        'endTimeUnspecified': ev.get('endTimeUnspecified', False),
+        'htmlLink': ev.get('htmlLink', ''),
+        'creatorEmail': ev.get('creator', {}).get('email', ''),
+    }
+
+    for field, val in [('start', ev['start']), ('end', ev['end'])]:
+        if 'date' in val:
+            rec['%sTime' % field] = parse(val['date']).replace(tzinfo=tzlocal())
+            rec['%sDate' % field] = val['date']
+        else:
+            rec['%sTime' % field] = parse(val['dateTime'])
+            rec['%sDate' % field] = parse(val['dateTime']).date().isoformat()
+    return rec
+
+
+def filterStarred(recs: Sequence[Record], maxCount=15) -> List[Record]:
+    recs = sorted(recs, key=lambda r: r['start'])
+    out = []
+    for rec in recs:
+        if re.search(r'(.*)\*\s*$', rec['title']):
+            out.append(rec)
+            if len(out) >= maxCount:
+                break
+    return out
+
+
+class SyncToMongo(object):
+    """reads gcal, writes to mongodb"""
+    collection: pymongo.collection.Collection
+
+    def __init__(self, conf: Conf, collection: pymongo.collection.Collection, agendaGraph: PatchableGraph,
+                 countdownGraph: PatchableGraph):
+        self.conf = conf
+        self.service = getCalendarService()
+        self.collection = collection
+        self.agendaGraph = agendaGraph
+        self.countdownGraph = countdownGraph
+
+    def update(self, days=10, cal=None) -> int:
+        start, end = dayRange(days)
+        spec: Dict[str, Any] = {"startTime": {"$gte": start, "$lte": end}}
+        if cal is not None:
+            spec['feed'] = feedFromCalId(self.conf, cal)
+        idsFormerlyInRange = [doc['_id'] for doc in self.collection.find(spec)]
+        n = self.collection.delete_many(spec)
+        log.info(f'cleared {n} records before reread')
+
+        totalNew = 0
+        currentRecords = []
+        for calId in getFirstPageOfCalendars(self.service):
+            if cal and calId != cal:
+                continue
+            print('read %s' % calId)
+            events = self.service.events().list(
+                calendarId=calId,
+                singleEvents=True,
+                timeMin=start.isoformat(),
+                timeMax=end.isoformat(),
+                showDeleted=False,
+                maxResults=1000,
+            ).execute()
+
+            for ev in events['items']:
+                rec = recordFromEv(self.conf, calId, ev)
+                self.upsertMongo(rec)
+                if rec['uri'] not in idsFormerlyInRange:
+                    totalNew += 1
+                currentRecords.append(rec)
+
+        self.updateGraphs(currentRecords)
+        return totalNew
+
+    def upsertMongo(self, rec: Record) -> List[Record]:
+        if self.collection.find_one({"_id": rec['uri']}) is not None:
+            log.debug("existing record %s", rec['uri'])
+            # this is not yet noticing updates
+            return []
+        else:
+            log.debug("add record %s", rec)
+            d = cast(Dict[str, Any], rec.copy())
+            d['_id'] = d.pop('uri')
+            self.collection.insert_one(d)
+            return [rec]
+
+    def updateGraphs(self, currentRecords: Iterable[Record]):
+        currentRecords = list(currentRecords)
+        self.agendaGraph.setToGraph(asGraph(limitDays(currentRecords, days=2)))
+        self.countdownGraph.setToGraph(asGraph(filterStarred(currentRecords, maxCount=15), extraClasses=[EV['CountdownEvent']]))
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/localtypes.py	Sun Jul 24 00:58:54 2022 -0700
@@ -0,0 +1,30 @@
+import datetime
+from typing import TypedDict
+
+
+class MongoConf(TypedDict):
+    host: str
+    port: int
+    database: str
+    collection: str
+
+
+class Conf(TypedDict):
+    event_uri_ns: str
+    minutes_between_polls: float
+    mongo: MongoConf
+
+
+class Record(TypedDict):
+    uri: str
+    feed: str
+    title: str
+    start: int
+    startTime: datetime.datetime
+    startDate: int
+    end: int
+    endTime: datetime.datetime
+    endDate: int
+    endTimeUnspecified: bool
+    htmlLink: str
+    creatorEmail: str