view ingest.py @ 28:e2209226b001

rewrite with starlette and background_loop
author drewp@bigasterisk.com
date Sun, 24 Jul 2022 00:58:54 -0700
parents
children d686e4a5b892
line wrap: on
line source

import logging
import re
from typing import Any, Dict, Iterable, List, Sequence, cast

import pymongo.collection
from dateutil.tz import tzlocal
from googleapiclient.discovery import Resource
from patchablegraph import PatchableGraph
from rdflib import Namespace

from calendar_connection import getCalendarService
from datetimemath import dayRange, limitDays, parse
from localtypes import Conf, Record
from graphconvert import asGraph

log = logging.getLogger()
EV = Namespace("http://bigasterisk.com/event#")


def feedFromCalId(conf: Conf, calId: str) -> str:
    return conf['event_uri_ns'] + 'feed/' + calId


def getFirstPageOfCalendars(service: Resource):
    for row in service.calendarList().list().execute()['items']:
        yield row['id']


def recordFromEv(conf: Conf, calId: str, ev: Dict) -> Record:

    def dateOrTime(d):
        if 'date' in d:
            return d['date']
        return d['dateTime']

    rec = {
        'uri': conf['event_uri_ns'] + ev['id'],
        'feed': feedFromCalId(conf, calId),
        'title': ev.get('summary', '?'),
        'start': dateOrTime(ev['start']),
        'end': dateOrTime(ev['end']),
        'endTimeUnspecified': ev.get('endTimeUnspecified', False),
        'htmlLink': ev.get('htmlLink', ''),
        'creatorEmail': ev.get('creator', {}).get('email', ''),
    }

    for field, val in [('start', ev['start']), ('end', ev['end'])]:
        if 'date' in val:
            rec['%sTime' % field] = parse(val['date']).replace(tzinfo=tzlocal())
            rec['%sDate' % field] = val['date']
        else:
            rec['%sTime' % field] = parse(val['dateTime'])
            rec['%sDate' % field] = parse(val['dateTime']).date().isoformat()
    return rec


def filterStarred(recs: Sequence[Record], maxCount=15) -> List[Record]:
    recs = sorted(recs, key=lambda r: r['start'])
    out = []
    for rec in recs:
        if re.search(r'(.*)\*\s*$', rec['title']):
            out.append(rec)
            if len(out) >= maxCount:
                break
    return out


class SyncToMongo(object):
    """reads gcal, writes to mongodb"""
    collection: pymongo.collection.Collection

    def __init__(self, conf: Conf, collection: pymongo.collection.Collection, agendaGraph: PatchableGraph,
                 countdownGraph: PatchableGraph):
        self.conf = conf
        self.service = getCalendarService()
        self.collection = collection
        self.agendaGraph = agendaGraph
        self.countdownGraph = countdownGraph

    def update(self, days=10, cal=None) -> int:
        start, end = dayRange(days)
        spec: Dict[str, Any] = {"startTime": {"$gte": start, "$lte": end}}
        if cal is not None:
            spec['feed'] = feedFromCalId(self.conf, cal)
        idsFormerlyInRange = [doc['_id'] for doc in self.collection.find(spec)]
        n = self.collection.delete_many(spec)
        log.info(f'cleared {n} records before reread')

        totalNew = 0
        currentRecords = []
        for calId in getFirstPageOfCalendars(self.service):
            if cal and calId != cal:
                continue
            print('read %s' % calId)
            events = self.service.events().list(
                calendarId=calId,
                singleEvents=True,
                timeMin=start.isoformat(),
                timeMax=end.isoformat(),
                showDeleted=False,
                maxResults=1000,
            ).execute()

            for ev in events['items']:
                rec = recordFromEv(self.conf, calId, ev)
                self.upsertMongo(rec)
                if rec['uri'] not in idsFormerlyInRange:
                    totalNew += 1
                currentRecords.append(rec)

        self.updateGraphs(currentRecords)
        return totalNew

    def upsertMongo(self, rec: Record) -> List[Record]:
        if self.collection.find_one({"_id": rec['uri']}) is not None:
            log.debug("existing record %s", rec['uri'])
            # this is not yet noticing updates
            return []
        else:
            log.debug("add record %s", rec)
            d = cast(Dict[str, Any], rec.copy())
            d['_id'] = d.pop('uri')
            self.collection.insert_one(d)
            return [rec]

    def updateGraphs(self, currentRecords: Iterable[Record]):
        currentRecords = list(currentRecords)
        self.agendaGraph.setToGraph(asGraph(limitDays(currentRecords, days=2)))
        self.countdownGraph.setToGraph(asGraph(filterStarred(currentRecords, maxCount=15), extraClasses=[EV['CountdownEvent']]))