comparison collector.py @ 12:032e59be8fe9

refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
author drewp@bigasterisk.com
date Fri, 25 Nov 2022 20:58:08 -0800
parents 36471461685f
children bfd95926be6e
comparison
equal deleted inserted replaced
11:baf886e01ed1 12:032e59be8fe9
5 5
6 Future: 6 Future:
7 - filter out unneeded stmts from the sources 7 - filter out unneeded stmts from the sources
8 - give a time resolution and concatenate any patches that come faster than that res 8 - give a time resolution and concatenate any patches that come faster than that res
9 """ 9 """
10 import collections
11 import json 10 import json
12 import logging 11 import logging
13 import time 12 import time
14 from typing import (Callable, Dict, List, Optional, Sequence, Set, Tuple, Union) 13 from typing import Dict, List, Optional, Set, Union
15 14
16 import cyclone.sse 15 import cyclone.sse
17 import cyclone.web 16 import cyclone.web
18 from docopt import docopt 17 from docopt import docopt
19 from patchablegraph.patchablegraph import jsonFromPatch 18 from patchablegraph.patchablegraph import jsonFromPatch
21 from prometheus_client import Summary 20 from prometheus_client import Summary
22 from prometheus_client.exposition import generate_latest 21 from prometheus_client.exposition import generate_latest
23 from prometheus_client.registry import REGISTRY 22 from prometheus_client.registry import REGISTRY
24 from rdfdb.patch import Patch 23 from rdfdb.patch import Patch
25 from rdflib import Namespace, URIRef 24 from rdflib import Namespace, URIRef
26 from rdflib.term import Node
27 from standardservice.logsetup import enableTwistedLog, log 25 from standardservice.logsetup import enableTwistedLog, log
28 from twisted.internet import defer, reactor 26 from twisted.internet import defer, reactor
29 27
30 from collector_config import config 28 from collector_config import config
29 from merge import SourceUri, ActiveStatements, LocalStatements
30 from patchsink import PatchSink
31 31
32 import cyclone.sse 32 import cyclone.sse
33 def py3_sendEvent(self, message, event=None, eid=None, retry=None): 33 def py3_sendEvent(self, message, event=None, eid=None, retry=None):
34 34
35 if isinstance(message, dict): 35 if isinstance(message, dict):
42 if event: 42 if event:
43 self.transport.write(b"event: %s\n" % event) 43 self.transport.write(b"event: %s\n" % event)
44 if retry: 44 if retry:
45 self.transport.write(b"retry: %s\n" % retry) 45 self.transport.write(b"retry: %s\n" % retry)
46 self.transport.write(b"data: %s\n\n" % message) 46 self.transport.write(b"data: %s\n\n" % message)
47
48
47 cyclone.sse.SSEHandler.sendEvent = py3_sendEvent 49 cyclone.sse.SSEHandler.sendEvent = py3_sendEvent
48
49
50 Statement = Tuple[Node, Node, Node, Node]
51
52
53 # SourceUri = NewType('SourceUri', URIRef) # doesn't work
54 class SourceUri(URIRef):
55 pass
56 50
57 51
58 ROOM = Namespace("http://projects.bigasterisk.com/room/") 52 ROOM = Namespace("http://projects.bigasterisk.com/room/")
59 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) 53 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/'))
60 54
61 GET_STATE_CALLS = Summary("get_state_calls", 'calls') 55 GET_STATE_CALLS = Summary("get_state_calls", 'calls')
62 LOCAL_STATEMENTS_PATCH_CALLS = Summary("local_statements_patch_calls", 'calls')
63 MAKE_SYNC_PATCH_CALLS = Summary("make_sync_patch_calls", 'calls')
64 ON_PATCH_CALLS = Summary("on_patch_calls", 'calls') 56 ON_PATCH_CALLS = Summary("on_patch_calls", 'calls')
65 SEND_UPDATE_PATCH_CALLS = Summary("send_update_patch_calls", 'calls') 57 SEND_UPDATE_PATCH_CALLS = Summary("send_update_patch_calls", 'calls')
66 REPLACE_SOURCE_STATEMENTS_CALLS = Summary("replace_source_statements_calls", 'calls')
67 58
68 59
69 class Metrics(cyclone.web.RequestHandler): 60 class Metrics(cyclone.web.RequestHandler):
70 61
71 def get(self): 62 def get(self):
72 self.add_header('content-type', 'text/plain') 63 self.add_header('content-type', 'text/plain')
73 self.write(generate_latest(REGISTRY)) 64 self.write(generate_latest(REGISTRY))
74 65
75
76 class LocalStatements(object):
77 """
78 functions that make statements originating from sse_collector itself
79 """
80
81 def __init__(self, applyPatch: Callable[[SourceUri, Patch], None]):
82 self.applyPatch = applyPatch
83 self._sourceState: Dict[SourceUri, Optional[URIRef]] = {} # source: state URIRef
84
85 @LOCAL_STATEMENTS_PATCH_CALLS.time()
86 def setSourceState(self, source: SourceUri, state: Optional[URIRef]):
87 """
88 add a patch to the COLLECTOR graph about the state of this
89 source. state=None to remove the source.
90 """
91 oldState = self._sourceState.get(source, None)
92 if state == oldState:
93 return
94 log.info('source state %s -> %s', source, state)
95 if oldState is None:
96 self._sourceState[source] = state
97 self.applyPatch(COLLECTOR, Patch(addQuads=[
98 (COLLECTOR, ROOM['source'], source, COLLECTOR),
99 (source, ROOM['state'], state, COLLECTOR),
100 ]))
101 elif state is None:
102 del self._sourceState[source]
103 self.applyPatch(COLLECTOR, Patch(delQuads=[
104 (COLLECTOR, ROOM['source'], source, COLLECTOR),
105 (source, ROOM['state'], oldState, COLLECTOR),
106 ]))
107 else:
108 self._sourceState[source] = state
109 self.applyPatch(COLLECTOR, Patch(addQuads=[
110 (source, ROOM['state'], state, COLLECTOR),
111 ], delQuads=[
112 (source, ROOM['state'], oldState, COLLECTOR),
113 ]))
114
115
116 def abbrevTerm(t: Union[URIRef, Node]) -> Union[str, Node]:
117 if isinstance(t, URIRef):
118 return (t.replace('http://projects.bigasterisk.com/room/', 'room:').replace('http://projects.bigasterisk.com/device/',
119 'dev:').replace('http://bigasterisk.com/sse_collector/', 'sc:'))
120 return t
121
122
123 def abbrevStmt(stmt: Statement) -> str:
124 return '(%s %s %s %s)' % (abbrevTerm(stmt[0]), abbrevTerm(stmt[1]), abbrevTerm(stmt[2]), abbrevTerm(stmt[3]))
125
126
127 class PatchSink(cyclone.sse.SSEHandler):
128 _handlerSerial = 0
129
130 def __init__(self, application: cyclone.web.Application, request):
131 cyclone.sse.SSEHandler.__init__(self, application, request)
132 self.bound = False
133 self.created = time.time()
134 self.graphClients = self.settings.graphClients
135
136 self._serial = PatchSink._handlerSerial
137 PatchSink._handlerSerial += 1
138 self.lastPatchSentTime: float = 0.0
139
140 def __repr__(self) -> str:
141 return '<Handler #%s>' % self._serial
142
143 def state(self) -> Dict:
144 return {
145 'created': round(self.created, 2),
146 'ageHours': round((time.time() - self.created) / 3600, 2),
147 'streamId': self.streamId,
148 'remoteIp': self.request.remote_ip, # wrong, need some forwarded-for thing
149 'foafAgent': self.request.headers.get('X-Foaf-Agent'),
150 'userAgent': self.request.headers.get('user-agent'),
151 }
152
153 def bind(self, *args, **kwargs):
154 self.streamId = args[0]
155
156 self.graphClients.addSseHandler(self)
157 # If something goes wrong with addSseHandler, I don't want to
158 # try removeSseHandler.
159 self.bound = True
160
161 def unbind(self) -> None:
162 if self.bound:
163 self.graphClients.removeSseHandler(self)
164
165
166 StatementTable = Dict[Statement, Tuple[Set[SourceUri], Set[PatchSink]]]
167
168
169 class PostDeleter(object):
170
171 def __init__(self, statements: StatementTable):
172 self.statements = statements
173
174 def __enter__(self):
175 self._garbage: List[Statement] = []
176 return self
177
178 def add(self, stmt: Statement):
179 self._garbage.append(stmt)
180
181 def __exit__(self, type, value, traceback):
182 if type is not None:
183 raise NotImplementedError()
184 for stmt in self._garbage:
185 del self.statements[stmt]
186
187
188 class ActiveStatements(object):
189
190 def __init__(self):
191 # This table holds statements asserted by any of our sources
192 # plus local statements that we introduce (source is
193 # http://bigasterisk.com/sse_collector/).
194 self.table: StatementTable = collections.defaultdict(lambda: (set(), set()))
195
196 def state(self) -> Dict:
197 return {
198 'len': len(self.table),
199 }
200
201 def postDeleteStatements(self) -> PostDeleter:
202 return PostDeleter(self.table)
203
204 def pprintTable(self) -> None:
205 for i, (stmt, (sources, handlers)) in enumerate(sorted(self.table.items())):
206 print("%03d. %-80s from %s to %s" % (i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers))
207
208 @MAKE_SYNC_PATCH_CALLS.time()
209 def makeSyncPatch(self, handler: PatchSink, sources: Set[SourceUri]):
210 # todo: this could run all handlers at once, which is how we
211 # use it anyway
212 adds = []
213 dels = []
214
215 with self.postDeleteStatements() as garbage:
216 for stmt, (stmtSources, handlers) in self.table.items():
217 belongsInHandler = not sources.isdisjoint(stmtSources)
218 handlerHasIt = handler in handlers
219 # log.debug("%s belong=%s has=%s",
220 # abbrevStmt(stmt), belongsInHandler, handlerHasIt)
221 if belongsInHandler and not handlerHasIt:
222 adds.append(stmt)
223 handlers.add(handler)
224 elif not belongsInHandler and handlerHasIt:
225 dels.append(stmt)
226 handlers.remove(handler)
227 if not handlers and not stmtSources:
228 garbage.add(stmt)
229
230 return Patch(addQuads=adds, delQuads=dels)
231
232 def applySourcePatch(self, source: SourceUri, p: Patch):
233 for stmt in p.addQuads:
234 sourceUrls, handlers = self.table[stmt]
235 if source in sourceUrls:
236 raise ValueError("%s added stmt that it already had: %s" % (source, abbrevStmt(stmt)))
237 sourceUrls.add(source)
238
239 with self.postDeleteStatements() as garbage:
240 for stmt in p.delQuads:
241 sourceUrls, handlers = self.table[stmt]
242 if source not in sourceUrls:
243 raise ValueError("%s deleting stmt that it didn't have: %s" % (source, abbrevStmt(stmt)))
244 sourceUrls.remove(source)
245 # this is rare, since some handler probably still has
246 # the stmt we're deleting, but it can happen e.g. when
247 # a handler was just deleted
248 if not sourceUrls and not handlers:
249 garbage.add(stmt)
250
251 @REPLACE_SOURCE_STATEMENTS_CALLS.time()
252 def replaceSourceStatements(self, source: SourceUri, stmts: Sequence[Statement]):
253 log.debug('replaceSourceStatements with %s stmts', len(stmts))
254 newStmts = set(stmts)
255
256 with self.postDeleteStatements() as garbage:
257 for stmt, (sources, handlers) in self.table.items():
258 if source in sources:
259 if stmt not in stmts:
260 sources.remove(source)
261 if not sources and not handlers:
262 garbage.add(stmt)
263 else:
264 if stmt in stmts:
265 sources.add(source)
266 newStmts.discard(stmt)
267
268 self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[]))
269
270 def discardHandler(self, handler: PatchSink):
271 with self.postDeleteStatements() as garbage:
272 for stmt, (sources, handlers) in self.table.items():
273 handlers.discard(handler)
274 if not sources and not handlers:
275 garbage.add(stmt)
276
277 def discardSource(self, source: SourceUri):
278 with self.postDeleteStatements() as garbage:
279 for stmt, (sources, handlers) in self.table.items():
280 sources.discard(source)
281 if not sources and not handlers:
282 garbage.add(stmt)
283 66
284 67
285 class GraphClients(object): 68 class GraphClients(object):
286 """ 69 """
287 All the active PatchSources and SSEHandlers 70 All the active PatchSources and SSEHandlers
463 handlers=[ 246 handlers=[
464 (r'/state', State), 247 (r'/state', State),
465 (r'/graph/', GraphList), 248 (r'/graph/', GraphList),
466 (r'/graph/(.+)', PatchSink), 249 (r'/graph/(.+)', PatchSink),
467 (r'/metrics', Metrics), 250 (r'/metrics', Metrics),
468 ], 251 ], graphClients=graphClients),
469 graphClients=graphClients),
470 interface='::') 252 interface='::')
471 reactor.run() 253 reactor.run()