Mercurial > code > home > repos > gcalendarwatch
view ingest.py @ 43:b5d3d9a8c83d
new graph of just the events happening now
author | drewp@bigasterisk.com |
---|---|
date | Mon, 19 Feb 2024 13:53:46 -0800 |
parents | 7d9609edcf9c |
children | e53a1bc87f99 |
line wrap: on
line source
import logging import os import re from typing import Any, Dict, Iterable, List, Sequence, cast import pymongo.collection from dateutil.tz import tzlocal from googleapiclient.discovery import Resource from googleapiclient.errors import HttpError from patchablegraph import PatchableGraph from rdflib import Namespace from calendar_connection import getCalendarService from datetimemath import dayRange, limitDays, parse from graphconvert import asGraph from localtypes import Conf, Record, feedFromCalId log = logging.getLogger() EV = Namespace("http://bigasterisk.com/event#") def getFirstPageOfCalendars(service: Resource) -> Iterable[tuple[str, str | None, str | None]]: for row in service.calendarList().list().execute()['items']: yield row['id'], row.get('summary'), row.get('description') def recordFromEv(conf: Conf, calId: str, ev: Dict) -> Record: def dateOrTime(d): if 'date' in d: return d['date'] return d['dateTime'] rec= { 'uri': conf['event_uri_ns'] + ev['id'], 'feed': feedFromCalId(conf, calId), 'title': ev.get('summary', '?'), 'start': dateOrTime(ev['start']), 'end': dateOrTime(ev['end']), 'endTimeUnspecified': ev.get('endTimeUnspecified', False), 'htmlLink': ev.get('htmlLink', ''), 'creatorEmail': ev.get('creator', {}).get('email', ''), } for field, val in [('start', ev['start']), ('end', ev['end'])]: if 'date' in val: rec['%sTime' % field] = parse(val['date']).replace(tzinfo=tzlocal()) rec['%sDate' % field] = val['date'] else: rec['%sTime' % field] = parse(val['dateTime']) rec['%sDate' % field] = parse(val['dateTime']).date().isoformat() return rec def filterStarred(recs: Sequence[Record], maxCount=15) -> List[Record]: recs = sorted(recs, key=lambda r: r['start']) out = [] for rec in recs: if re.search(r'(.*)\*\s*$', rec['title']): out.append(rec) if len(out) >= maxCount: break return out class SyncToMongo(object): """reads gcal, writes to mongodb and graphs""" collection: pymongo.collection.Collection def __init__(self, conf: Conf, collection: pymongo.collection.Collection, agendaGraph: PatchableGraph, countdownGraph: PatchableGraph): self.conf = conf self.service = getCalendarService() self.collection = collection self.calendarsCollection = collection.database.get_collection('gcalendar_cals') self.agendaGraph = agendaGraph self.countdownGraph = countdownGraph def update(self, days=10, cal=None) -> int: start, end = dayRange(days) idsFormerlyInRange = self.clearByStartTime(cal, start, end) totalNew, currentRecords = self.gatherNewEventsInRange(cal, start, end, idsFormerlyInRange) self.updateGraphs(currentRecords) return totalNew def gatherNewEventsInRange(self, cal, start, end, idsFormerlyInRange): totalNew = 0 currentRecords = [] try: cals = getFirstPageOfCalendars(self.service) except HttpError: log.error('on getFirstPageOfCalendars') os.abort() for calId, summary, description in cals: self.calendarsCollection.update_one({'_id': calId}, {'$set': { 'summary': summary, 'description': description, }}, upsert=True) if cal and calId != cal: continue try: self.updateOneCal(start, end, idsFormerlyInRange, totalNew, currentRecords, calId) except HttpError: log.error(f"on cal {calId}") return totalNew, currentRecords def clearByStartTime(self, cal, start, end): spec: Dict[str, Any] = {"startTime": {"$gte": start, "$lte": end}} if cal is not None: spec['feed'] = feedFromCalId(self.conf, cal) idsFormerlyInRange = [doc['_id'] for doc in self.collection.find(spec)] n = self.collection.delete_many(spec) log.info(f'cleared {n} records before reread') return idsFormerlyInRange def updateOneCal(self, start, end, idsFormerlyInRange, totalNew, currentRecords, calId): print('read %s' % calId) events = self.service.events().list( calendarId=calId, singleEvents=True, timeMin=start.isoformat(), timeMax=end.isoformat(), showDeleted=False, maxResults=1000, ).execute() for ev in events['items']: rec = recordFromEv(self.conf, calId, ev) self.upsertMongo(rec) if rec['uri'] not in idsFormerlyInRange: totalNew += 1 currentRecords.append(rec) def upsertMongo(self, rec: Record) -> List[Record]: if self.collection.find_one({"_id": rec['uri']}) is not None: log.debug("existing record %s", rec['uri']) # this is not yet noticing updates return [] else: log.debug("add record %s", rec) d = cast(Dict[str, Any], rec.copy()) d['_id'] = d.pop('uri') self.collection.insert_one(d) return [rec] def updateGraphs(self, currentRecords: Iterable[Record]): currentRecords = list(currentRecords) cals = list(self.calendarsCollection.find()) self.agendaGraph.setToGraph(asGraph(self.conf, cals, limitDays(currentRecords, days=2))) self.countdownGraph.setToGraph(asGraph(self.conf, cals, filterStarred(currentRecords, maxCount=15), extraClasses=[EV['CountdownEvent']]))