Mercurial > code > home > repos > gcalendarwatch
view ingest.py @ 41:ab54c9f65f76
project updates; rdflib 7.0.0
author | drewp@bigasterisk.com |
---|---|
date | Sun, 18 Feb 2024 12:33:52 -0800 |
parents | d686e4a5b892 |
children | 7d9609edcf9c |
line wrap: on
line source
import logging import os import re from typing import Any, Dict, Iterable, List, Sequence, cast import pymongo.collection from dateutil.tz import tzlocal from googleapiclient.discovery import Resource from googleapiclient.errors import HttpError from patchablegraph import PatchableGraph from rdflib import Namespace from calendar_connection import getCalendarService from datetimemath import dayRange, limitDays, parse from graphconvert import asGraph from localtypes import Conf, Record log = logging.getLogger() EV = Namespace("http://bigasterisk.com/event#") def feedFromCalId(conf: Conf, calId: str) -> str: return conf['event_uri_ns'] + 'feed/' + calId def getFirstPageOfCalendars(service: Resource): for row in service.calendarList().list().execute()['items']: yield row['id'] def recordFromEv(conf: Conf, calId: str, ev: Dict) -> Record: def dateOrTime(d): if 'date' in d: return d['date'] return d['dateTime'] rec = { 'uri': conf['event_uri_ns'] + ev['id'], 'feed': feedFromCalId(conf, calId), 'title': ev.get('summary', '?'), 'start': dateOrTime(ev['start']), 'end': dateOrTime(ev['end']), 'endTimeUnspecified': ev.get('endTimeUnspecified', False), 'htmlLink': ev.get('htmlLink', ''), 'creatorEmail': ev.get('creator', {}).get('email', ''), } for field, val in [('start', ev['start']), ('end', ev['end'])]: if 'date' in val: rec['%sTime' % field] = parse(val['date']).replace(tzinfo=tzlocal()) rec['%sDate' % field] = val['date'] else: rec['%sTime' % field] = parse(val['dateTime']) rec['%sDate' % field] = parse(val['dateTime']).date().isoformat() return rec def filterStarred(recs: Sequence[Record], maxCount=15) -> List[Record]: recs = sorted(recs, key=lambda r: r['start']) out = [] for rec in recs: if re.search(r'(.*)\*\s*$', rec['title']): out.append(rec) if len(out) >= maxCount: break return out class SyncToMongo(object): """reads gcal, writes to mongodb""" collection: pymongo.collection.Collection def __init__(self, conf: Conf, collection: pymongo.collection.Collection, agendaGraph: PatchableGraph, countdownGraph: PatchableGraph): self.conf = conf self.service = getCalendarService() self.collection = collection self.agendaGraph = agendaGraph self.countdownGraph = countdownGraph def update(self, days=10, cal=None) -> int: start, end = dayRange(days) idsFormerlyInRange = self.clearByStartTime(cal, start, end) totalNew, currentRecords = self.gatherNewEventsInRange(cal, start, end, idsFormerlyInRange) self.updateGraphs(currentRecords) return totalNew def gatherNewEventsInRange(self, cal, start, end, idsFormerlyInRange): totalNew = 0 currentRecords = [] try: calIds = getFirstPageOfCalendars(self.service) except HttpError: log.error('on getFirstPageOfCalendars') os.abort() for calId in calIds: if cal and calId != cal: continue try: self.updateOneCal(start, end, idsFormerlyInRange, totalNew, currentRecords, calId) except HttpError: log.error(f"on cal {calId}") return totalNew,currentRecords def clearByStartTime(self, cal, start, end): spec: Dict[str, Any] = {"startTime": {"$gte": start, "$lte": end}} if cal is not None: spec['feed'] = feedFromCalId(self.conf, cal) idsFormerlyInRange = [doc['_id'] for doc in self.collection.find(spec)] n = self.collection.delete_many(spec) log.info(f'cleared {n} records before reread') return idsFormerlyInRange def updateOneCal(self, start, end, idsFormerlyInRange, totalNew, currentRecords, calId): print('read %s' % calId) events = self.service.events().list( calendarId=calId, singleEvents=True, timeMin=start.isoformat(), timeMax=end.isoformat(), showDeleted=False, maxResults=1000, ).execute() for ev in events['items']: rec = recordFromEv(self.conf, calId, ev) self.upsertMongo(rec) if rec['uri'] not in idsFormerlyInRange: totalNew += 1 currentRecords.append(rec) def upsertMongo(self, rec: Record) -> List[Record]: if self.collection.find_one({"_id": rec['uri']}) is not None: log.debug("existing record %s", rec['uri']) # this is not yet noticing updates return [] else: log.debug("add record %s", rec) d = cast(Dict[str, Any], rec.copy()) d['_id'] = d.pop('uri') self.collection.insert_one(d) return [rec] def updateGraphs(self, currentRecords: Iterable[Record]): currentRecords = list(currentRecords) self.agendaGraph.setToGraph(asGraph(limitDays(currentRecords, days=2))) self.countdownGraph.setToGraph(asGraph(filterStarred(currentRecords, maxCount=15), extraClasses=[EV['CountdownEvent']]))