Mercurial > code > home > repos > gcalendarwatch
diff ingest.py @ 28:e2209226b001
rewrite with starlette and background_loop
author | drewp@bigasterisk.com |
---|---|
date | Sun, 24 Jul 2022 00:58:54 -0700 |
parents | |
children | d686e4a5b892 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/ingest.py Sun Jul 24 00:58:54 2022 -0700 @@ -0,0 +1,129 @@ +import logging +import re +from typing import Any, Dict, Iterable, List, Sequence, cast + +import pymongo.collection +from dateutil.tz import tzlocal +from googleapiclient.discovery import Resource +from patchablegraph import PatchableGraph +from rdflib import Namespace + +from calendar_connection import getCalendarService +from datetimemath import dayRange, limitDays, parse +from localtypes import Conf, Record +from graphconvert import asGraph + +log = logging.getLogger() +EV = Namespace("http://bigasterisk.com/event#") + + +def feedFromCalId(conf: Conf, calId: str) -> str: + return conf['event_uri_ns'] + 'feed/' + calId + + +def getFirstPageOfCalendars(service: Resource): + for row in service.calendarList().list().execute()['items']: + yield row['id'] + + +def recordFromEv(conf: Conf, calId: str, ev: Dict) -> Record: + + def dateOrTime(d): + if 'date' in d: + return d['date'] + return d['dateTime'] + + rec = { + 'uri': conf['event_uri_ns'] + ev['id'], + 'feed': feedFromCalId(conf, calId), + 'title': ev.get('summary', '?'), + 'start': dateOrTime(ev['start']), + 'end': dateOrTime(ev['end']), + 'endTimeUnspecified': ev.get('endTimeUnspecified', False), + 'htmlLink': ev.get('htmlLink', ''), + 'creatorEmail': ev.get('creator', {}).get('email', ''), + } + + for field, val in [('start', ev['start']), ('end', ev['end'])]: + if 'date' in val: + rec['%sTime' % field] = parse(val['date']).replace(tzinfo=tzlocal()) + rec['%sDate' % field] = val['date'] + else: + rec['%sTime' % field] = parse(val['dateTime']) + rec['%sDate' % field] = parse(val['dateTime']).date().isoformat() + return rec + + +def filterStarred(recs: Sequence[Record], maxCount=15) -> List[Record]: + recs = sorted(recs, key=lambda r: r['start']) + out = [] + for rec in recs: + if re.search(r'(.*)\*\s*$', rec['title']): + out.append(rec) + if len(out) >= maxCount: + break + return out + + +class SyncToMongo(object): + """reads gcal, writes to mongodb""" + collection: pymongo.collection.Collection + + def __init__(self, conf: Conf, collection: pymongo.collection.Collection, agendaGraph: PatchableGraph, + countdownGraph: PatchableGraph): + self.conf = conf + self.service = getCalendarService() + self.collection = collection + self.agendaGraph = agendaGraph + self.countdownGraph = countdownGraph + + def update(self, days=10, cal=None) -> int: + start, end = dayRange(days) + spec: Dict[str, Any] = {"startTime": {"$gte": start, "$lte": end}} + if cal is not None: + spec['feed'] = feedFromCalId(self.conf, cal) + idsFormerlyInRange = [doc['_id'] for doc in self.collection.find(spec)] + n = self.collection.delete_many(spec) + log.info(f'cleared {n} records before reread') + + totalNew = 0 + currentRecords = [] + for calId in getFirstPageOfCalendars(self.service): + if cal and calId != cal: + continue + print('read %s' % calId) + events = self.service.events().list( + calendarId=calId, + singleEvents=True, + timeMin=start.isoformat(), + timeMax=end.isoformat(), + showDeleted=False, + maxResults=1000, + ).execute() + + for ev in events['items']: + rec = recordFromEv(self.conf, calId, ev) + self.upsertMongo(rec) + if rec['uri'] not in idsFormerlyInRange: + totalNew += 1 + currentRecords.append(rec) + + self.updateGraphs(currentRecords) + return totalNew + + def upsertMongo(self, rec: Record) -> List[Record]: + if self.collection.find_one({"_id": rec['uri']}) is not None: + log.debug("existing record %s", rec['uri']) + # this is not yet noticing updates + return [] + else: + log.debug("add record %s", rec) + d = cast(Dict[str, Any], rec.copy()) + d['_id'] = d.pop('uri') + self.collection.insert_one(d) + return [rec] + + def updateGraphs(self, currentRecords: Iterable[Record]): + currentRecords = list(currentRecords) + self.agendaGraph.setToGraph(asGraph(limitDays(currentRecords, days=2))) + self.countdownGraph.setToGraph(asGraph(filterStarred(currentRecords, maxCount=15), extraClasses=[EV['CountdownEvent']]))