Mercurial > code > home > repos > gcalendarwatch
view ingest.py @ 45:e53a1bc87f99
cleanup and some fixes to starred event graph
author | drewp@bigasterisk.com |
---|---|
date | Thu, 06 Jun 2024 15:57:26 -0700 |
parents | 7d9609edcf9c |
children |
line wrap: on
line source
import logging import os import re from typing import Any, Dict, Iterable, List, Sequence, cast import pymongo.collection from dateutil.tz import tzlocal from googleapiclient.discovery import Resource from googleapiclient.errors import HttpError from patchablegraph import PatchableGraph from rdflib import Namespace from calendar_connection import getCalendarService from datetimemath import dayRange, limitDays, parse from graphconvert import asGraph from localtypes import Conf, Record, feedFromCalId log = logging.getLogger() EV = Namespace("http://bigasterisk.com/event#") def _getFirstPageOfCalendars(service: Resource) -> Iterable[tuple[str, str | None, str | None]]: for row in service.calendarList().list().execute()['items']: yield row['id'], row.get('summary'), row.get('description') def _recordFromEv(conf: Conf, calId: str, ev: Dict) -> Record: def dateOrTime(d): if 'date' in d: return d['date'] return d['dateTime'] rec = { 'uri': conf['event_uri_ns'] + ev['id'], 'feed': feedFromCalId(conf, calId), 'title': ev.get('summary', '?'), 'start': dateOrTime(ev['start']), 'end': dateOrTime(ev['end']), 'endTimeUnspecified': ev.get('endTimeUnspecified', False), 'htmlLink': ev.get('htmlLink', ''), 'creatorEmail': ev.get('creator', {}).get('email', ''), } for field, val in [('start', ev['start']), ('end', ev['end'])]: if 'date' in val: rec['%sTime' % field] = parse(val['date']).replace(tzinfo=tzlocal()) rec['%sDate' % field] = val['date'] else: rec['%sTime' % field] = parse(val['dateTime']) rec['%sDate' % field] = parse(val['dateTime']).date().isoformat() return cast(Record, rec) def filterStarred(recs: Sequence[Record], maxCount=15) -> List[Record]: recs = sorted(recs, key=lambda r: r['start']) out = [] for rec in recs: if m:=re.search(r'(.*)\*\s*$', rec['title']): rec = rec.copy() rec['title'] = m.group(1) out.append(rec) if len(out) >= maxCount: break return out class SyncToMongo(object): """reads gcal, writes to mongodb and graphs""" collection: pymongo.collection.Collection def __init__(self, conf: Conf, collection: pymongo.collection.Collection, agendaGraph: PatchableGraph, countdownGraph: PatchableGraph): self.conf = conf self.service = getCalendarService() self.collection = collection self.calendarsCollection = collection.database.get_collection('gcalendar_cals') self.agendaGraph = agendaGraph self.countdownGraph = countdownGraph def update(self, days=10, cal=None) -> int: start, end = dayRange(days) idsFormerlyInRange = self._clearByStartTime(cal, start, end) totalNew, currentRecords = self._gatherNewEventsInRange(cal, start, end, idsFormerlyInRange) self.updateGraphs(currentRecords) return totalNew def _gatherNewEventsInRange(self, cal, start, end, idsFormerlyInRange): totalNew = 0 currentRecords = [] try: cals = _getFirstPageOfCalendars(self.service) except HttpError: log.error('on getFirstPageOfCalendars') os.abort() for calId, summary, description in cals: self.calendarsCollection.update_one({'_id': calId}, {'$set': { 'summary': summary, 'description': description, }}, upsert=True) if cal and calId != cal: continue try: self._updateOneCal(start, end, idsFormerlyInRange, totalNew, currentRecords, calId) except HttpError: log.error(f"on cal {calId}") return totalNew, currentRecords def _clearByStartTime(self, cal, start, end): spec: Dict[str, Any] = {"startTime": {"$gte": start, "$lte": end}} if cal is not None: spec['feed'] = feedFromCalId(self.conf, cal) idsFormerlyInRange = [doc['_id'] for doc in self.collection.find(spec)] n = self.collection.delete_many(spec) log.info(f'cleared {n} records before reread') return idsFormerlyInRange def _updateOneCal(self, start, end, idsFormerlyInRange, totalNew, currentRecords, calId): print('read %s' % calId) events = self.service.events().list( calendarId=calId, singleEvents=True, timeMin=start.isoformat(), timeMax=end.isoformat(), showDeleted=False, maxResults=1000, ).execute() for ev in events['items']: rec = _recordFromEv(self.conf, calId, ev) self._upsertMongo(rec) if rec['uri'] not in idsFormerlyInRange: totalNew += 1 currentRecords.append(rec) def _upsertMongo(self, rec: Record) -> List[Record]: if self.collection.find_one({"_id": rec['uri']}) is not None: log.debug("existing record %s", rec['uri']) # this is not yet noticing updates return [] else: log.debug("add record %s", rec) d = cast(Dict[str, Any], rec.copy()) d['_id'] = d.pop('uri') self.collection.insert_one(d) return [rec] def updateGraphs(self, currentRecords: Iterable[Record]): currentRecords = list(currentRecords) cals = list(self.calendarsCollection.find()) self.agendaGraph.setToGraph(asGraph(self.conf, cals, limitDays(currentRecords, days=2))) self.countdownGraph.setToGraph(asGraph(self.conf, cals, filterStarred(currentRecords, maxCount=15), extraClasses=[EV['CountdownEvent']]))