diff ingest.py @ 28:e2209226b001

rewrite with starlette and background_loop
author drewp@bigasterisk.com
date Sun, 24 Jul 2022 00:58:54 -0700
parents
children d686e4a5b892
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/ingest.py	Sun Jul 24 00:58:54 2022 -0700
@@ -0,0 +1,129 @@
+import logging
+import re
+from typing import Any, Dict, Iterable, List, Sequence, cast
+
+import pymongo.collection
+from dateutil.tz import tzlocal
+from googleapiclient.discovery import Resource
+from patchablegraph import PatchableGraph
+from rdflib import Namespace
+
+from calendar_connection import getCalendarService
+from datetimemath import dayRange, limitDays, parse
+from localtypes import Conf, Record
+from graphconvert import asGraph
+
+log = logging.getLogger()
+EV = Namespace("http://bigasterisk.com/event#")
+
+
+def feedFromCalId(conf: Conf, calId: str) -> str:
+    return conf['event_uri_ns'] + 'feed/' + calId
+
+
+def getFirstPageOfCalendars(service: Resource):
+    for row in service.calendarList().list().execute()['items']:
+        yield row['id']
+
+
+def recordFromEv(conf: Conf, calId: str, ev: Dict) -> Record:
+
+    def dateOrTime(d):
+        if 'date' in d:
+            return d['date']
+        return d['dateTime']
+
+    rec = {
+        'uri': conf['event_uri_ns'] + ev['id'],
+        'feed': feedFromCalId(conf, calId),
+        'title': ev.get('summary', '?'),
+        'start': dateOrTime(ev['start']),
+        'end': dateOrTime(ev['end']),
+        'endTimeUnspecified': ev.get('endTimeUnspecified', False),
+        'htmlLink': ev.get('htmlLink', ''),
+        'creatorEmail': ev.get('creator', {}).get('email', ''),
+    }
+
+    for field, val in [('start', ev['start']), ('end', ev['end'])]:
+        if 'date' in val:
+            rec['%sTime' % field] = parse(val['date']).replace(tzinfo=tzlocal())
+            rec['%sDate' % field] = val['date']
+        else:
+            rec['%sTime' % field] = parse(val['dateTime'])
+            rec['%sDate' % field] = parse(val['dateTime']).date().isoformat()
+    return rec
+
+
+def filterStarred(recs: Sequence[Record], maxCount=15) -> List[Record]:
+    recs = sorted(recs, key=lambda r: r['start'])
+    out = []
+    for rec in recs:
+        if re.search(r'(.*)\*\s*$', rec['title']):
+            out.append(rec)
+            if len(out) >= maxCount:
+                break
+    return out
+
+
+class SyncToMongo(object):
+    """reads gcal, writes to mongodb"""
+    collection: pymongo.collection.Collection
+
+    def __init__(self, conf: Conf, collection: pymongo.collection.Collection, agendaGraph: PatchableGraph,
+                 countdownGraph: PatchableGraph):
+        self.conf = conf
+        self.service = getCalendarService()
+        self.collection = collection
+        self.agendaGraph = agendaGraph
+        self.countdownGraph = countdownGraph
+
+    def update(self, days=10, cal=None) -> int:
+        start, end = dayRange(days)
+        spec: Dict[str, Any] = {"startTime": {"$gte": start, "$lte": end}}
+        if cal is not None:
+            spec['feed'] = feedFromCalId(self.conf, cal)
+        idsFormerlyInRange = [doc['_id'] for doc in self.collection.find(spec)]
+        n = self.collection.delete_many(spec)
+        log.info(f'cleared {n} records before reread')
+
+        totalNew = 0
+        currentRecords = []
+        for calId in getFirstPageOfCalendars(self.service):
+            if cal and calId != cal:
+                continue
+            print('read %s' % calId)
+            events = self.service.events().list(
+                calendarId=calId,
+                singleEvents=True,
+                timeMin=start.isoformat(),
+                timeMax=end.isoformat(),
+                showDeleted=False,
+                maxResults=1000,
+            ).execute()
+
+            for ev in events['items']:
+                rec = recordFromEv(self.conf, calId, ev)
+                self.upsertMongo(rec)
+                if rec['uri'] not in idsFormerlyInRange:
+                    totalNew += 1
+                currentRecords.append(rec)
+
+        self.updateGraphs(currentRecords)
+        return totalNew
+
+    def upsertMongo(self, rec: Record) -> List[Record]:
+        if self.collection.find_one({"_id": rec['uri']}) is not None:
+            log.debug("existing record %s", rec['uri'])
+            # this is not yet noticing updates
+            return []
+        else:
+            log.debug("add record %s", rec)
+            d = cast(Dict[str, Any], rec.copy())
+            d['_id'] = d.pop('uri')
+            self.collection.insert_one(d)
+            return [rec]
+
+    def updateGraphs(self, currentRecords: Iterable[Record]):
+        currentRecords = list(currentRecords)
+        self.agendaGraph.setToGraph(asGraph(limitDays(currentRecords, days=2)))
+        self.countdownGraph.setToGraph(asGraph(filterStarred(currentRecords, maxCount=15), extraClasses=[EV['CountdownEvent']]))