Mercurial > code > home > repos > gcalendarwatch
view ingest.py @ 42:7d9609edcf9c
track calendar feed summary/description text and emit them in graphs
author | drewp@bigasterisk.com |
---|---|
date | Sun, 18 Feb 2024 12:34:53 -0800 |
parents | d686e4a5b892 |
children | e53a1bc87f99 |
line wrap: on
line source
import logging import os import re from typing import Any, Dict, Iterable, List, Sequence, cast import pymongo.collection from dateutil.tz import tzlocal from googleapiclient.discovery import Resource from googleapiclient.errors import HttpError from patchablegraph import PatchableGraph from rdflib import Namespace from calendar_connection import getCalendarService from datetimemath import dayRange, limitDays, parse from graphconvert import asGraph from localtypes import Conf, Record, feedFromCalId log = logging.getLogger() EV = Namespace("http://bigasterisk.com/event#") def getFirstPageOfCalendars(service: Resource) -> Iterable[tuple[str, str | None, str | None]]: for row in service.calendarList().list().execute()['items']: yield row['id'], row.get('summary'), row.get('description') def recordFromEv(conf: Conf, calId: str, ev: Dict) -> Record: def dateOrTime(d): if 'date' in d: return d['date'] return d['dateTime'] rec= { 'uri': conf['event_uri_ns'] + ev['id'], 'feed': feedFromCalId(conf, calId), 'title': ev.get('summary', '?'), 'start': dateOrTime(ev['start']), 'end': dateOrTime(ev['end']), 'endTimeUnspecified': ev.get('endTimeUnspecified', False), 'htmlLink': ev.get('htmlLink', ''), 'creatorEmail': ev.get('creator', {}).get('email', ''), } for field, val in [('start', ev['start']), ('end', ev['end'])]: if 'date' in val: rec['%sTime' % field] = parse(val['date']).replace(tzinfo=tzlocal()) rec['%sDate' % field] = val['date'] else: rec['%sTime' % field] = parse(val['dateTime']) rec['%sDate' % field] = parse(val['dateTime']).date().isoformat() return rec def filterStarred(recs: Sequence[Record], maxCount=15) -> List[Record]: recs = sorted(recs, key=lambda r: r['start']) out = [] for rec in recs: if re.search(r'(.*)\*\s*$', rec['title']): out.append(rec) if len(out) >= maxCount: break return out class SyncToMongo(object): """reads gcal, writes to mongodb and graphs""" collection: pymongo.collection.Collection def __init__(self, conf: Conf, collection: pymongo.collection.Collection, agendaGraph: PatchableGraph, countdownGraph: PatchableGraph): self.conf = conf self.service = getCalendarService() self.collection = collection self.calendarsCollection = collection.database.get_collection('gcalendar_cals') self.agendaGraph = agendaGraph self.countdownGraph = countdownGraph def update(self, days=10, cal=None) -> int: start, end = dayRange(days) idsFormerlyInRange = self.clearByStartTime(cal, start, end) totalNew, currentRecords = self.gatherNewEventsInRange(cal, start, end, idsFormerlyInRange) self.updateGraphs(currentRecords) return totalNew def gatherNewEventsInRange(self, cal, start, end, idsFormerlyInRange): totalNew = 0 currentRecords = [] try: cals = getFirstPageOfCalendars(self.service) except HttpError: log.error('on getFirstPageOfCalendars') os.abort() for calId, summary, description in cals: self.calendarsCollection.update_one({'_id': calId}, {'$set': { 'summary': summary, 'description': description, }}, upsert=True) if cal and calId != cal: continue try: self.updateOneCal(start, end, idsFormerlyInRange, totalNew, currentRecords, calId) except HttpError: log.error(f"on cal {calId}") return totalNew, currentRecords def clearByStartTime(self, cal, start, end): spec: Dict[str, Any] = {"startTime": {"$gte": start, "$lte": end}} if cal is not None: spec['feed'] = feedFromCalId(self.conf, cal) idsFormerlyInRange = [doc['_id'] for doc in self.collection.find(spec)] n = self.collection.delete_many(spec) log.info(f'cleared {n} records before reread') return idsFormerlyInRange def updateOneCal(self, start, end, idsFormerlyInRange, totalNew, currentRecords, calId): print('read %s' % calId) events = self.service.events().list( calendarId=calId, singleEvents=True, timeMin=start.isoformat(), timeMax=end.isoformat(), showDeleted=False, maxResults=1000, ).execute() for ev in events['items']: rec = recordFromEv(self.conf, calId, ev) self.upsertMongo(rec) if rec['uri'] not in idsFormerlyInRange: totalNew += 1 currentRecords.append(rec) def upsertMongo(self, rec: Record) -> List[Record]: if self.collection.find_one({"_id": rec['uri']}) is not None: log.debug("existing record %s", rec['uri']) # this is not yet noticing updates return [] else: log.debug("add record %s", rec) d = cast(Dict[str, Any], rec.copy()) d['_id'] = d.pop('uri') self.collection.insert_one(d) return [rec] def updateGraphs(self, currentRecords: Iterable[Record]): currentRecords = list(currentRecords) cals = list(self.calendarsCollection.find()) self.agendaGraph.setToGraph(asGraph(self.conf, cals, limitDays(currentRecords, days=2))) self.countdownGraph.setToGraph(asGraph(self.conf, cals, filterStarred(currentRecords, maxCount=15), extraClasses=[EV['CountdownEvent']]))