comparison ingest.py @ 38:d686e4a5b892

refactor; attempt clearer errors
author drewp@bigasterisk.com
date Sun, 12 Nov 2023 23:21:10 -0800
parents e2209226b001
children 7d9609edcf9c
comparison
equal deleted inserted replaced
37:2da773e48a57 38:d686e4a5b892
1 import logging 1 import logging
2 import os
2 import re 3 import re
3 from typing import Any, Dict, Iterable, List, Sequence, cast 4 from typing import Any, Dict, Iterable, List, Sequence, cast
4 5
5 import pymongo.collection 6 import pymongo.collection
6 from dateutil.tz import tzlocal 7 from dateutil.tz import tzlocal
7 from googleapiclient.discovery import Resource 8 from googleapiclient.discovery import Resource
9 from googleapiclient.errors import HttpError
8 from patchablegraph import PatchableGraph 10 from patchablegraph import PatchableGraph
9 from rdflib import Namespace 11 from rdflib import Namespace
10 12
11 from calendar_connection import getCalendarService 13 from calendar_connection import getCalendarService
12 from datetimemath import dayRange, limitDays, parse 14 from datetimemath import dayRange, limitDays, parse
15 from graphconvert import asGraph
13 from localtypes import Conf, Record 16 from localtypes import Conf, Record
14 from graphconvert import asGraph
15 17
16 log = logging.getLogger() 18 log = logging.getLogger()
17 EV = Namespace("http://bigasterisk.com/event#") 19 EV = Namespace("http://bigasterisk.com/event#")
18 20
19 21
77 self.agendaGraph = agendaGraph 79 self.agendaGraph = agendaGraph
78 self.countdownGraph = countdownGraph 80 self.countdownGraph = countdownGraph
79 81
80 def update(self, days=10, cal=None) -> int: 82 def update(self, days=10, cal=None) -> int:
81 start, end = dayRange(days) 83 start, end = dayRange(days)
84 idsFormerlyInRange = self.clearByStartTime(cal, start, end)
85
86 totalNew, currentRecords = self.gatherNewEventsInRange(cal, start, end, idsFormerlyInRange)
87
88
89 self.updateGraphs(currentRecords)
90 return totalNew
91
92 def gatherNewEventsInRange(self, cal, start, end, idsFormerlyInRange):
93 totalNew = 0
94 currentRecords = []
95 try:
96 calIds = getFirstPageOfCalendars(self.service)
97 except HttpError:
98 log.error('on getFirstPageOfCalendars')
99 os.abort()
100 for calId in calIds:
101 if cal and calId != cal:
102 continue
103 try:
104 self.updateOneCal(start, end, idsFormerlyInRange, totalNew, currentRecords, calId)
105 except HttpError:
106 log.error(f"on cal {calId}")
107 return totalNew,currentRecords
108
109 def clearByStartTime(self, cal, start, end):
82 spec: Dict[str, Any] = {"startTime": {"$gte": start, "$lte": end}} 110 spec: Dict[str, Any] = {"startTime": {"$gte": start, "$lte": end}}
83 if cal is not None: 111 if cal is not None:
84 spec['feed'] = feedFromCalId(self.conf, cal) 112 spec['feed'] = feedFromCalId(self.conf, cal)
85 idsFormerlyInRange = [doc['_id'] for doc in self.collection.find(spec)] 113 idsFormerlyInRange = [doc['_id'] for doc in self.collection.find(spec)]
86 n = self.collection.delete_many(spec) 114 n = self.collection.delete_many(spec)
87 log.info(f'cleared {n} records before reread') 115 log.info(f'cleared {n} records before reread')
116 return idsFormerlyInRange
88 117
89 totalNew = 0 118 def updateOneCal(self, start, end, idsFormerlyInRange, totalNew, currentRecords, calId):
90 currentRecords = [] 119 print('read %s' % calId)
91 for calId in getFirstPageOfCalendars(self.service): 120 events = self.service.events().list(
92 if cal and calId != cal:
93 continue
94 print('read %s' % calId)
95 events = self.service.events().list(
96 calendarId=calId, 121 calendarId=calId,
97 singleEvents=True, 122 singleEvents=True,
98 timeMin=start.isoformat(), 123 timeMin=start.isoformat(),
99 timeMax=end.isoformat(), 124 timeMax=end.isoformat(),
100 showDeleted=False, 125 showDeleted=False,
101 maxResults=1000, 126 maxResults=1000,
102 ).execute() 127 ).execute()
103 128
104 for ev in events['items']: 129 for ev in events['items']:
105 rec = recordFromEv(self.conf, calId, ev) 130 rec = recordFromEv(self.conf, calId, ev)
106 self.upsertMongo(rec) 131 self.upsertMongo(rec)
107 if rec['uri'] not in idsFormerlyInRange: 132 if rec['uri'] not in idsFormerlyInRange:
108 totalNew += 1 133 totalNew += 1
109 currentRecords.append(rec) 134 currentRecords.append(rec)
110
111 self.updateGraphs(currentRecords)
112 return totalNew
113 135
114 def upsertMongo(self, rec: Record) -> List[Record]: 136 def upsertMongo(self, rec: Record) -> List[Record]:
115 if self.collection.find_one({"_id": rec['uri']}) is not None: 137 if self.collection.find_one({"_id": rec['uri']}) is not None:
116 log.debug("existing record %s", rec['uri']) 138 log.debug("existing record %s", rec['uri'])
117 # this is not yet noticing updates 139 # this is not yet noticing updates