comparison ingest.py @ 42:7d9609edcf9c

track calendar feed summary/description text and emit them in graphs
author drewp@bigasterisk.com
date Sun, 18 Feb 2024 12:34:53 -0800
parents d686e4a5b892
children e53a1bc87f99
comparison
equal deleted inserted replaced
41:ab54c9f65f76 42:7d9609edcf9c
11 from rdflib import Namespace 11 from rdflib import Namespace
12 12
13 from calendar_connection import getCalendarService 13 from calendar_connection import getCalendarService
14 from datetimemath import dayRange, limitDays, parse 14 from datetimemath import dayRange, limitDays, parse
15 from graphconvert import asGraph 15 from graphconvert import asGraph
16 from localtypes import Conf, Record 16 from localtypes import Conf, Record, feedFromCalId
17 17
18 log = logging.getLogger() 18 log = logging.getLogger()
19 EV = Namespace("http://bigasterisk.com/event#") 19 EV = Namespace("http://bigasterisk.com/event#")
20 20
21 21
22 def feedFromCalId(conf: Conf, calId: str) -> str:
23 return conf['event_uri_ns'] + 'feed/' + calId
24 22
25 23
26 def getFirstPageOfCalendars(service: Resource): 24 def getFirstPageOfCalendars(service: Resource) -> Iterable[tuple[str, str | None, str | None]]:
27 for row in service.calendarList().list().execute()['items']: 25 for row in service.calendarList().list().execute()['items']:
28 yield row['id'] 26 yield row['id'], row.get('summary'), row.get('description')
29 27
30 28
31 def recordFromEv(conf: Conf, calId: str, ev: Dict) -> Record: 29 def recordFromEv(conf: Conf, calId: str, ev: Dict) -> Record:
32 30
33 def dateOrTime(d): 31 def dateOrTime(d):
34 if 'date' in d: 32 if 'date' in d:
35 return d['date'] 33 return d['date']
36 return d['dateTime'] 34 return d['dateTime']
37 35
38 rec = { 36 rec= {
39 'uri': conf['event_uri_ns'] + ev['id'], 37 'uri': conf['event_uri_ns'] + ev['id'],
40 'feed': feedFromCalId(conf, calId), 38 'feed': feedFromCalId(conf, calId),
41 'title': ev.get('summary', '?'), 39 'title': ev.get('summary', '?'),
42 'start': dateOrTime(ev['start']), 40 'start': dateOrTime(ev['start']),
43 'end': dateOrTime(ev['end']), 41 'end': dateOrTime(ev['end']),
66 break 64 break
67 return out 65 return out
68 66
69 67
70 class SyncToMongo(object): 68 class SyncToMongo(object):
71 """reads gcal, writes to mongodb""" 69 """reads gcal, writes to mongodb and graphs"""
72 collection: pymongo.collection.Collection 70 collection: pymongo.collection.Collection
73 71
74 def __init__(self, conf: Conf, collection: pymongo.collection.Collection, agendaGraph: PatchableGraph, 72 def __init__(self, conf: Conf, collection: pymongo.collection.Collection, agendaGraph: PatchableGraph, countdownGraph: PatchableGraph):
75 countdownGraph: PatchableGraph):
76 self.conf = conf 73 self.conf = conf
77 self.service = getCalendarService() 74 self.service = getCalendarService()
78 self.collection = collection 75 self.collection = collection
76 self.calendarsCollection = collection.database.get_collection('gcalendar_cals')
79 self.agendaGraph = agendaGraph 77 self.agendaGraph = agendaGraph
80 self.countdownGraph = countdownGraph 78 self.countdownGraph = countdownGraph
81 79
82 def update(self, days=10, cal=None) -> int: 80 def update(self, days=10, cal=None) -> int:
83 start, end = dayRange(days) 81 start, end = dayRange(days)
84 idsFormerlyInRange = self.clearByStartTime(cal, start, end) 82 idsFormerlyInRange = self.clearByStartTime(cal, start, end)
85 83
86 totalNew, currentRecords = self.gatherNewEventsInRange(cal, start, end, idsFormerlyInRange) 84 totalNew, currentRecords = self.gatherNewEventsInRange(cal, start, end, idsFormerlyInRange)
87 85
88
89 self.updateGraphs(currentRecords) 86 self.updateGraphs(currentRecords)
90 return totalNew 87 return totalNew
91 88
92 def gatherNewEventsInRange(self, cal, start, end, idsFormerlyInRange): 89 def gatherNewEventsInRange(self, cal, start, end, idsFormerlyInRange):
93 totalNew = 0 90 totalNew = 0
94 currentRecords = [] 91 currentRecords = []
95 try: 92 try:
96 calIds = getFirstPageOfCalendars(self.service) 93 cals = getFirstPageOfCalendars(self.service)
97 except HttpError: 94 except HttpError:
98 log.error('on getFirstPageOfCalendars') 95 log.error('on getFirstPageOfCalendars')
99 os.abort() 96 os.abort()
100 for calId in calIds: 97 for calId, summary, description in cals:
98 self.calendarsCollection.update_one({'_id': calId}, {'$set': {
99 'summary': summary,
100 'description': description,
101 }}, upsert=True)
101 if cal and calId != cal: 102 if cal and calId != cal:
102 continue 103 continue
103 try: 104 try:
104 self.updateOneCal(start, end, idsFormerlyInRange, totalNew, currentRecords, calId) 105 self.updateOneCal(start, end, idsFormerlyInRange, totalNew, currentRecords, calId)
105 except HttpError: 106 except HttpError:
106 log.error(f"on cal {calId}") 107 log.error(f"on cal {calId}")
107 return totalNew,currentRecords 108 return totalNew, currentRecords
108 109
109 def clearByStartTime(self, cal, start, end): 110 def clearByStartTime(self, cal, start, end):
110 spec: Dict[str, Any] = {"startTime": {"$gte": start, "$lte": end}} 111 spec: Dict[str, Any] = {"startTime": {"$gte": start, "$lte": end}}
111 if cal is not None: 112 if cal is not None:
112 spec['feed'] = feedFromCalId(self.conf, cal) 113 spec['feed'] = feedFromCalId(self.conf, cal)
116 return idsFormerlyInRange 117 return idsFormerlyInRange
117 118
118 def updateOneCal(self, start, end, idsFormerlyInRange, totalNew, currentRecords, calId): 119 def updateOneCal(self, start, end, idsFormerlyInRange, totalNew, currentRecords, calId):
119 print('read %s' % calId) 120 print('read %s' % calId)
120 events = self.service.events().list( 121 events = self.service.events().list(
121 calendarId=calId, 122 calendarId=calId,
122 singleEvents=True, 123 singleEvents=True,
123 timeMin=start.isoformat(), 124 timeMin=start.isoformat(),
124 timeMax=end.isoformat(), 125 timeMax=end.isoformat(),
125 showDeleted=False, 126 showDeleted=False,
126 maxResults=1000, 127 maxResults=1000,
127 ).execute() 128 ).execute()
128 129
129 for ev in events['items']: 130 for ev in events['items']:
130 rec = recordFromEv(self.conf, calId, ev) 131 rec = recordFromEv(self.conf, calId, ev)
131 self.upsertMongo(rec) 132 self.upsertMongo(rec)
132 if rec['uri'] not in idsFormerlyInRange: 133 if rec['uri'] not in idsFormerlyInRange:
145 self.collection.insert_one(d) 146 self.collection.insert_one(d)
146 return [rec] 147 return [rec]
147 148
148 def updateGraphs(self, currentRecords: Iterable[Record]): 149 def updateGraphs(self, currentRecords: Iterable[Record]):
149 currentRecords = list(currentRecords) 150 currentRecords = list(currentRecords)
150 self.agendaGraph.setToGraph(asGraph(limitDays(currentRecords, days=2))) 151 cals = list(self.calendarsCollection.find())
151 self.countdownGraph.setToGraph(asGraph(filterStarred(currentRecords, maxCount=15), extraClasses=[EV['CountdownEvent']])) 152 self.agendaGraph.setToGraph(asGraph(self.conf, cals, limitDays(currentRecords, days=2)))
153 self.countdownGraph.setToGraph(asGraph(self.conf, cals, filterStarred(currentRecords, maxCount=15), extraClasses=[EV['CountdownEvent']]))