Mercurial > code > home > repos > gcalendarwatch
view ingest.py @ 36:cb990883e52f
deployment; dep updates
author | drewp@bigasterisk.com |
---|---|
date | Sun, 12 Nov 2023 23:19:23 -0800 |
parents | e2209226b001 |
children | d686e4a5b892 |
line wrap: on
line source
import logging import re from typing import Any, Dict, Iterable, List, Sequence, cast import pymongo.collection from dateutil.tz import tzlocal from googleapiclient.discovery import Resource from patchablegraph import PatchableGraph from rdflib import Namespace from calendar_connection import getCalendarService from datetimemath import dayRange, limitDays, parse from localtypes import Conf, Record from graphconvert import asGraph log = logging.getLogger() EV = Namespace("http://bigasterisk.com/event#") def feedFromCalId(conf: Conf, calId: str) -> str: return conf['event_uri_ns'] + 'feed/' + calId def getFirstPageOfCalendars(service: Resource): for row in service.calendarList().list().execute()['items']: yield row['id'] def recordFromEv(conf: Conf, calId: str, ev: Dict) -> Record: def dateOrTime(d): if 'date' in d: return d['date'] return d['dateTime'] rec = { 'uri': conf['event_uri_ns'] + ev['id'], 'feed': feedFromCalId(conf, calId), 'title': ev.get('summary', '?'), 'start': dateOrTime(ev['start']), 'end': dateOrTime(ev['end']), 'endTimeUnspecified': ev.get('endTimeUnspecified', False), 'htmlLink': ev.get('htmlLink', ''), 'creatorEmail': ev.get('creator', {}).get('email', ''), } for field, val in [('start', ev['start']), ('end', ev['end'])]: if 'date' in val: rec['%sTime' % field] = parse(val['date']).replace(tzinfo=tzlocal()) rec['%sDate' % field] = val['date'] else: rec['%sTime' % field] = parse(val['dateTime']) rec['%sDate' % field] = parse(val['dateTime']).date().isoformat() return rec def filterStarred(recs: Sequence[Record], maxCount=15) -> List[Record]: recs = sorted(recs, key=lambda r: r['start']) out = [] for rec in recs: if re.search(r'(.*)\*\s*$', rec['title']): out.append(rec) if len(out) >= maxCount: break return out class SyncToMongo(object): """reads gcal, writes to mongodb""" collection: pymongo.collection.Collection def __init__(self, conf: Conf, collection: pymongo.collection.Collection, agendaGraph: PatchableGraph, countdownGraph: PatchableGraph): self.conf = conf self.service = getCalendarService() self.collection = collection self.agendaGraph = agendaGraph self.countdownGraph = countdownGraph def update(self, days=10, cal=None) -> int: start, end = dayRange(days) spec: Dict[str, Any] = {"startTime": {"$gte": start, "$lte": end}} if cal is not None: spec['feed'] = feedFromCalId(self.conf, cal) idsFormerlyInRange = [doc['_id'] for doc in self.collection.find(spec)] n = self.collection.delete_many(spec) log.info(f'cleared {n} records before reread') totalNew = 0 currentRecords = [] for calId in getFirstPageOfCalendars(self.service): if cal and calId != cal: continue print('read %s' % calId) events = self.service.events().list( calendarId=calId, singleEvents=True, timeMin=start.isoformat(), timeMax=end.isoformat(), showDeleted=False, maxResults=1000, ).execute() for ev in events['items']: rec = recordFromEv(self.conf, calId, ev) self.upsertMongo(rec) if rec['uri'] not in idsFormerlyInRange: totalNew += 1 currentRecords.append(rec) self.updateGraphs(currentRecords) return totalNew def upsertMongo(self, rec: Record) -> List[Record]: if self.collection.find_one({"_id": rec['uri']}) is not None: log.debug("existing record %s", rec['uri']) # this is not yet noticing updates return [] else: log.debug("add record %s", rec) d = cast(Dict[str, Any], rec.copy()) d['_id'] = d.pop('uri') self.collection.insert_one(d) return [rec] def updateGraphs(self, currentRecords: Iterable[Record]): currentRecords = list(currentRecords) self.agendaGraph.setToGraph(asGraph(limitDays(currentRecords, days=2))) self.countdownGraph.setToGraph(asGraph(filterStarred(currentRecords, maxCount=15), extraClasses=[EV['CountdownEvent']]))