view ingest.py @ 45:e53a1bc87f99

cleanup and some fixes to starred event graph
author drewp@bigasterisk.com
date Thu, 06 Jun 2024 15:57:26 -0700
parents 7d9609edcf9c
children
line wrap: on
line source

import logging
import os
import re
from typing import Any, Dict, Iterable, List, Sequence, cast

import pymongo.collection
from dateutil.tz import tzlocal
from googleapiclient.discovery import Resource
from googleapiclient.errors import HttpError
from patchablegraph import PatchableGraph
from rdflib import Namespace

from calendar_connection import getCalendarService
from datetimemath import dayRange, limitDays, parse
from graphconvert import asGraph
from localtypes import Conf, Record, feedFromCalId

log = logging.getLogger()
EV = Namespace("http://bigasterisk.com/event#")


def _getFirstPageOfCalendars(service: Resource) -> Iterable[tuple[str, str | None, str | None]]:
    for row in service.calendarList().list().execute()['items']:
        yield row['id'], row.get('summary'), row.get('description')


def _recordFromEv(conf: Conf, calId: str, ev: Dict) -> Record:

    def dateOrTime(d):
        if 'date' in d:
            return d['date']
        return d['dateTime']

    rec = {
        'uri': conf['event_uri_ns'] + ev['id'],
        'feed': feedFromCalId(conf, calId),
        'title': ev.get('summary', '?'),
        'start': dateOrTime(ev['start']),
        'end': dateOrTime(ev['end']),
        'endTimeUnspecified': ev.get('endTimeUnspecified', False),
        'htmlLink': ev.get('htmlLink', ''),
        'creatorEmail': ev.get('creator', {}).get('email', ''),
    }

    for field, val in [('start', ev['start']), ('end', ev['end'])]:
        if 'date' in val:
            rec['%sTime' % field] = parse(val['date']).replace(tzinfo=tzlocal())
            rec['%sDate' % field] = val['date']
        else:
            rec['%sTime' % field] = parse(val['dateTime'])
            rec['%sDate' % field] = parse(val['dateTime']).date().isoformat()
    return cast(Record, rec)


def filterStarred(recs: Sequence[Record], maxCount=15) -> List[Record]:
    recs = sorted(recs, key=lambda r: r['start'])
    out = []
    for rec in recs:
        if m:=re.search(r'(.*)\*\s*$', rec['title']):
            rec = rec.copy()
            rec['title'] = m.group(1)
            out.append(rec)
            if len(out) >= maxCount:
                break
    return out


class SyncToMongo(object):
    """reads gcal, writes to mongodb and graphs"""
    collection: pymongo.collection.Collection

    def __init__(self, conf: Conf, collection: pymongo.collection.Collection, agendaGraph: PatchableGraph, countdownGraph: PatchableGraph):
        self.conf = conf
        self.service = getCalendarService()
        self.collection = collection
        self.calendarsCollection = collection.database.get_collection('gcalendar_cals')
        self.agendaGraph = agendaGraph
        self.countdownGraph = countdownGraph

    def update(self, days=10, cal=None) -> int:
        start, end = dayRange(days)
        idsFormerlyInRange = self._clearByStartTime(cal, start, end)

        totalNew, currentRecords = self._gatherNewEventsInRange(cal, start, end, idsFormerlyInRange)

        self.updateGraphs(currentRecords)
        return totalNew

    def _gatherNewEventsInRange(self, cal, start, end, idsFormerlyInRange):
        totalNew = 0
        currentRecords = []
        try:
            cals = _getFirstPageOfCalendars(self.service)
        except HttpError:
            log.error('on getFirstPageOfCalendars')
            os.abort()
        for calId, summary, description in cals:
            self.calendarsCollection.update_one({'_id': calId}, {'$set': {
                'summary': summary,
                'description': description,
            }}, upsert=True)
            if cal and calId != cal:
                continue
            try:
                self._updateOneCal(start, end, idsFormerlyInRange, totalNew, currentRecords, calId)
            except HttpError:
                log.error(f"on cal {calId}")
        return totalNew, currentRecords

    def _clearByStartTime(self, cal, start, end):
        spec: Dict[str, Any] = {"startTime": {"$gte": start, "$lte": end}}
        if cal is not None:
            spec['feed'] = feedFromCalId(self.conf, cal)
        idsFormerlyInRange = [doc['_id'] for doc in self.collection.find(spec)]
        n = self.collection.delete_many(spec)
        log.info(f'cleared {n} records before reread')
        return idsFormerlyInRange

    def _updateOneCal(self, start, end, idsFormerlyInRange, totalNew, currentRecords, calId):
        print('read %s' % calId)
        events = self.service.events().list(
            calendarId=calId,
            singleEvents=True,
            timeMin=start.isoformat(),
            timeMax=end.isoformat(),
            showDeleted=False,
            maxResults=1000,
        ).execute()

        for ev in events['items']:
            rec = _recordFromEv(self.conf, calId, ev)
            self._upsertMongo(rec)
            if rec['uri'] not in idsFormerlyInRange:
                totalNew += 1
            currentRecords.append(rec)

    def _upsertMongo(self, rec: Record) -> List[Record]:
        if self.collection.find_one({"_id": rec['uri']}) is not None:
            log.debug("existing record %s", rec['uri'])
            # this is not yet noticing updates
            return []
        else:
            log.debug("add record %s", rec)
            d = cast(Dict[str, Any], rec.copy())
            d['_id'] = d.pop('uri')
            self.collection.insert_one(d)
            return [rec]

    def updateGraphs(self, currentRecords: Iterable[Record]):
        currentRecords = list(currentRecords)
        cals = list(self.calendarsCollection.find())
        self.agendaGraph.setToGraph(asGraph(self.conf, cals, limitDays(currentRecords, days=2)))
        self.countdownGraph.setToGraph(asGraph(self.conf, cals, filterStarred(currentRecords, maxCount=15), extraClasses=[EV['CountdownEvent']]))