view ingest.py @ 36:cb990883e52f

deployment; dep updates
author drewp@bigasterisk.com
date Sun, 12 Nov 2023 23:19:23 -0800
parents e2209226b001
children d686e4a5b892
line wrap: on
line source

import logging
import re
from typing import Any, Dict, Iterable, List, Sequence, cast

import pymongo.collection
from dateutil.tz import tzlocal
from googleapiclient.discovery import Resource
from patchablegraph import PatchableGraph
from rdflib import Namespace

from calendar_connection import getCalendarService
from datetimemath import dayRange, limitDays, parse
from localtypes import Conf, Record
from graphconvert import asGraph

log = logging.getLogger()
EV = Namespace("http://bigasterisk.com/event#")


def feedFromCalId(conf: Conf, calId: str) -> str:
    return conf['event_uri_ns'] + 'feed/' + calId


def getFirstPageOfCalendars(service: Resource):
    for row in service.calendarList().list().execute()['items']:
        yield row['id']


def recordFromEv(conf: Conf, calId: str, ev: Dict) -> Record:

    def dateOrTime(d):
        if 'date' in d:
            return d['date']
        return d['dateTime']

    rec = {
        'uri': conf['event_uri_ns'] + ev['id'],
        'feed': feedFromCalId(conf, calId),
        'title': ev.get('summary', '?'),
        'start': dateOrTime(ev['start']),
        'end': dateOrTime(ev['end']),
        'endTimeUnspecified': ev.get('endTimeUnspecified', False),
        'htmlLink': ev.get('htmlLink', ''),
        'creatorEmail': ev.get('creator', {}).get('email', ''),
    }

    for field, val in [('start', ev['start']), ('end', ev['end'])]:
        if 'date' in val:
            rec['%sTime' % field] = parse(val['date']).replace(tzinfo=tzlocal())
            rec['%sDate' % field] = val['date']
        else:
            rec['%sTime' % field] = parse(val['dateTime'])
            rec['%sDate' % field] = parse(val['dateTime']).date().isoformat()
    return rec


def filterStarred(recs: Sequence[Record], maxCount=15) -> List[Record]:
    recs = sorted(recs, key=lambda r: r['start'])
    out = []
    for rec in recs:
        if re.search(r'(.*)\*\s*$', rec['title']):
            out.append(rec)
            if len(out) >= maxCount:
                break
    return out


class SyncToMongo(object):
    """reads gcal, writes to mongodb"""
    collection: pymongo.collection.Collection

    def __init__(self, conf: Conf, collection: pymongo.collection.Collection, agendaGraph: PatchableGraph,
                 countdownGraph: PatchableGraph):
        self.conf = conf
        self.service = getCalendarService()
        self.collection = collection
        self.agendaGraph = agendaGraph
        self.countdownGraph = countdownGraph

    def update(self, days=10, cal=None) -> int:
        start, end = dayRange(days)
        spec: Dict[str, Any] = {"startTime": {"$gte": start, "$lte": end}}
        if cal is not None:
            spec['feed'] = feedFromCalId(self.conf, cal)
        idsFormerlyInRange = [doc['_id'] for doc in self.collection.find(spec)]
        n = self.collection.delete_many(spec)
        log.info(f'cleared {n} records before reread')

        totalNew = 0
        currentRecords = []
        for calId in getFirstPageOfCalendars(self.service):
            if cal and calId != cal:
                continue
            print('read %s' % calId)
            events = self.service.events().list(
                calendarId=calId,
                singleEvents=True,
                timeMin=start.isoformat(),
                timeMax=end.isoformat(),
                showDeleted=False,
                maxResults=1000,
            ).execute()

            for ev in events['items']:
                rec = recordFromEv(self.conf, calId, ev)
                self.upsertMongo(rec)
                if rec['uri'] not in idsFormerlyInRange:
                    totalNew += 1
                currentRecords.append(rec)

        self.updateGraphs(currentRecords)
        return totalNew

    def upsertMongo(self, rec: Record) -> List[Record]:
        if self.collection.find_one({"_id": rec['uri']}) is not None:
            log.debug("existing record %s", rec['uri'])
            # this is not yet noticing updates
            return []
        else:
            log.debug("add record %s", rec)
            d = cast(Dict[str, Any], rec.copy())
            d['_id'] = d.pop('uri')
            self.collection.insert_one(d)
            return [rec]

    def updateGraphs(self, currentRecords: Iterable[Record]):
        currentRecords = list(currentRecords)
        self.agendaGraph.setToGraph(asGraph(limitDays(currentRecords, days=2)))
        self.countdownGraph.setToGraph(asGraph(filterStarred(currentRecords, maxCount=15), extraClasses=[EV['CountdownEvent']]))