Mercurial > code > home > repos > collector
comparison collector.py @ 12:032e59be8fe9
refactor to separate the nonweb stuff a bit, in prep for cyclone->starlette
author | drewp@bigasterisk.com |
---|---|
date | Fri, 25 Nov 2022 20:58:08 -0800 |
parents | 36471461685f |
children | bfd95926be6e |
comparison
equal
deleted
inserted
replaced
11:baf886e01ed1 | 12:032e59be8fe9 |
---|---|
5 | 5 |
6 Future: | 6 Future: |
7 - filter out unneeded stmts from the sources | 7 - filter out unneeded stmts from the sources |
8 - give a time resolution and concatenate any patches that come faster than that res | 8 - give a time resolution and concatenate any patches that come faster than that res |
9 """ | 9 """ |
10 import collections | |
11 import json | 10 import json |
12 import logging | 11 import logging |
13 import time | 12 import time |
14 from typing import (Callable, Dict, List, Optional, Sequence, Set, Tuple, Union) | 13 from typing import Dict, List, Optional, Set, Union |
15 | 14 |
16 import cyclone.sse | 15 import cyclone.sse |
17 import cyclone.web | 16 import cyclone.web |
18 from docopt import docopt | 17 from docopt import docopt |
19 from patchablegraph.patchablegraph import jsonFromPatch | 18 from patchablegraph.patchablegraph import jsonFromPatch |
21 from prometheus_client import Summary | 20 from prometheus_client import Summary |
22 from prometheus_client.exposition import generate_latest | 21 from prometheus_client.exposition import generate_latest |
23 from prometheus_client.registry import REGISTRY | 22 from prometheus_client.registry import REGISTRY |
24 from rdfdb.patch import Patch | 23 from rdfdb.patch import Patch |
25 from rdflib import Namespace, URIRef | 24 from rdflib import Namespace, URIRef |
26 from rdflib.term import Node | |
27 from standardservice.logsetup import enableTwistedLog, log | 25 from standardservice.logsetup import enableTwistedLog, log |
28 from twisted.internet import defer, reactor | 26 from twisted.internet import defer, reactor |
29 | 27 |
30 from collector_config import config | 28 from collector_config import config |
29 from merge import SourceUri, ActiveStatements, LocalStatements | |
30 from patchsink import PatchSink | |
31 | 31 |
32 import cyclone.sse | 32 import cyclone.sse |
33 def py3_sendEvent(self, message, event=None, eid=None, retry=None): | 33 def py3_sendEvent(self, message, event=None, eid=None, retry=None): |
34 | 34 |
35 if isinstance(message, dict): | 35 if isinstance(message, dict): |
42 if event: | 42 if event: |
43 self.transport.write(b"event: %s\n" % event) | 43 self.transport.write(b"event: %s\n" % event) |
44 if retry: | 44 if retry: |
45 self.transport.write(b"retry: %s\n" % retry) | 45 self.transport.write(b"retry: %s\n" % retry) |
46 self.transport.write(b"data: %s\n\n" % message) | 46 self.transport.write(b"data: %s\n\n" % message) |
47 | |
48 | |
47 cyclone.sse.SSEHandler.sendEvent = py3_sendEvent | 49 cyclone.sse.SSEHandler.sendEvent = py3_sendEvent |
48 | |
49 | |
50 Statement = Tuple[Node, Node, Node, Node] | |
51 | |
52 | |
53 # SourceUri = NewType('SourceUri', URIRef) # doesn't work | |
54 class SourceUri(URIRef): | |
55 pass | |
56 | 50 |
57 | 51 |
58 ROOM = Namespace("http://projects.bigasterisk.com/room/") | 52 ROOM = Namespace("http://projects.bigasterisk.com/room/") |
59 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) | 53 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) |
60 | 54 |
61 GET_STATE_CALLS = Summary("get_state_calls", 'calls') | 55 GET_STATE_CALLS = Summary("get_state_calls", 'calls') |
62 LOCAL_STATEMENTS_PATCH_CALLS = Summary("local_statements_patch_calls", 'calls') | |
63 MAKE_SYNC_PATCH_CALLS = Summary("make_sync_patch_calls", 'calls') | |
64 ON_PATCH_CALLS = Summary("on_patch_calls", 'calls') | 56 ON_PATCH_CALLS = Summary("on_patch_calls", 'calls') |
65 SEND_UPDATE_PATCH_CALLS = Summary("send_update_patch_calls", 'calls') | 57 SEND_UPDATE_PATCH_CALLS = Summary("send_update_patch_calls", 'calls') |
66 REPLACE_SOURCE_STATEMENTS_CALLS = Summary("replace_source_statements_calls", 'calls') | |
67 | 58 |
68 | 59 |
69 class Metrics(cyclone.web.RequestHandler): | 60 class Metrics(cyclone.web.RequestHandler): |
70 | 61 |
71 def get(self): | 62 def get(self): |
72 self.add_header('content-type', 'text/plain') | 63 self.add_header('content-type', 'text/plain') |
73 self.write(generate_latest(REGISTRY)) | 64 self.write(generate_latest(REGISTRY)) |
74 | 65 |
75 | |
76 class LocalStatements(object): | |
77 """ | |
78 functions that make statements originating from sse_collector itself | |
79 """ | |
80 | |
81 def __init__(self, applyPatch: Callable[[SourceUri, Patch], None]): | |
82 self.applyPatch = applyPatch | |
83 self._sourceState: Dict[SourceUri, Optional[URIRef]] = {} # source: state URIRef | |
84 | |
85 @LOCAL_STATEMENTS_PATCH_CALLS.time() | |
86 def setSourceState(self, source: SourceUri, state: Optional[URIRef]): | |
87 """ | |
88 add a patch to the COLLECTOR graph about the state of this | |
89 source. state=None to remove the source. | |
90 """ | |
91 oldState = self._sourceState.get(source, None) | |
92 if state == oldState: | |
93 return | |
94 log.info('source state %s -> %s', source, state) | |
95 if oldState is None: | |
96 self._sourceState[source] = state | |
97 self.applyPatch(COLLECTOR, Patch(addQuads=[ | |
98 (COLLECTOR, ROOM['source'], source, COLLECTOR), | |
99 (source, ROOM['state'], state, COLLECTOR), | |
100 ])) | |
101 elif state is None: | |
102 del self._sourceState[source] | |
103 self.applyPatch(COLLECTOR, Patch(delQuads=[ | |
104 (COLLECTOR, ROOM['source'], source, COLLECTOR), | |
105 (source, ROOM['state'], oldState, COLLECTOR), | |
106 ])) | |
107 else: | |
108 self._sourceState[source] = state | |
109 self.applyPatch(COLLECTOR, Patch(addQuads=[ | |
110 (source, ROOM['state'], state, COLLECTOR), | |
111 ], delQuads=[ | |
112 (source, ROOM['state'], oldState, COLLECTOR), | |
113 ])) | |
114 | |
115 | |
116 def abbrevTerm(t: Union[URIRef, Node]) -> Union[str, Node]: | |
117 if isinstance(t, URIRef): | |
118 return (t.replace('http://projects.bigasterisk.com/room/', 'room:').replace('http://projects.bigasterisk.com/device/', | |
119 'dev:').replace('http://bigasterisk.com/sse_collector/', 'sc:')) | |
120 return t | |
121 | |
122 | |
123 def abbrevStmt(stmt: Statement) -> str: | |
124 return '(%s %s %s %s)' % (abbrevTerm(stmt[0]), abbrevTerm(stmt[1]), abbrevTerm(stmt[2]), abbrevTerm(stmt[3])) | |
125 | |
126 | |
127 class PatchSink(cyclone.sse.SSEHandler): | |
128 _handlerSerial = 0 | |
129 | |
130 def __init__(self, application: cyclone.web.Application, request): | |
131 cyclone.sse.SSEHandler.__init__(self, application, request) | |
132 self.bound = False | |
133 self.created = time.time() | |
134 self.graphClients = self.settings.graphClients | |
135 | |
136 self._serial = PatchSink._handlerSerial | |
137 PatchSink._handlerSerial += 1 | |
138 self.lastPatchSentTime: float = 0.0 | |
139 | |
140 def __repr__(self) -> str: | |
141 return '<Handler #%s>' % self._serial | |
142 | |
143 def state(self) -> Dict: | |
144 return { | |
145 'created': round(self.created, 2), | |
146 'ageHours': round((time.time() - self.created) / 3600, 2), | |
147 'streamId': self.streamId, | |
148 'remoteIp': self.request.remote_ip, # wrong, need some forwarded-for thing | |
149 'foafAgent': self.request.headers.get('X-Foaf-Agent'), | |
150 'userAgent': self.request.headers.get('user-agent'), | |
151 } | |
152 | |
153 def bind(self, *args, **kwargs): | |
154 self.streamId = args[0] | |
155 | |
156 self.graphClients.addSseHandler(self) | |
157 # If something goes wrong with addSseHandler, I don't want to | |
158 # try removeSseHandler. | |
159 self.bound = True | |
160 | |
161 def unbind(self) -> None: | |
162 if self.bound: | |
163 self.graphClients.removeSseHandler(self) | |
164 | |
165 | |
166 StatementTable = Dict[Statement, Tuple[Set[SourceUri], Set[PatchSink]]] | |
167 | |
168 | |
169 class PostDeleter(object): | |
170 | |
171 def __init__(self, statements: StatementTable): | |
172 self.statements = statements | |
173 | |
174 def __enter__(self): | |
175 self._garbage: List[Statement] = [] | |
176 return self | |
177 | |
178 def add(self, stmt: Statement): | |
179 self._garbage.append(stmt) | |
180 | |
181 def __exit__(self, type, value, traceback): | |
182 if type is not None: | |
183 raise NotImplementedError() | |
184 for stmt in self._garbage: | |
185 del self.statements[stmt] | |
186 | |
187 | |
188 class ActiveStatements(object): | |
189 | |
190 def __init__(self): | |
191 # This table holds statements asserted by any of our sources | |
192 # plus local statements that we introduce (source is | |
193 # http://bigasterisk.com/sse_collector/). | |
194 self.table: StatementTable = collections.defaultdict(lambda: (set(), set())) | |
195 | |
196 def state(self) -> Dict: | |
197 return { | |
198 'len': len(self.table), | |
199 } | |
200 | |
201 def postDeleteStatements(self) -> PostDeleter: | |
202 return PostDeleter(self.table) | |
203 | |
204 def pprintTable(self) -> None: | |
205 for i, (stmt, (sources, handlers)) in enumerate(sorted(self.table.items())): | |
206 print("%03d. %-80s from %s to %s" % (i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers)) | |
207 | |
208 @MAKE_SYNC_PATCH_CALLS.time() | |
209 def makeSyncPatch(self, handler: PatchSink, sources: Set[SourceUri]): | |
210 # todo: this could run all handlers at once, which is how we | |
211 # use it anyway | |
212 adds = [] | |
213 dels = [] | |
214 | |
215 with self.postDeleteStatements() as garbage: | |
216 for stmt, (stmtSources, handlers) in self.table.items(): | |
217 belongsInHandler = not sources.isdisjoint(stmtSources) | |
218 handlerHasIt = handler in handlers | |
219 # log.debug("%s belong=%s has=%s", | |
220 # abbrevStmt(stmt), belongsInHandler, handlerHasIt) | |
221 if belongsInHandler and not handlerHasIt: | |
222 adds.append(stmt) | |
223 handlers.add(handler) | |
224 elif not belongsInHandler and handlerHasIt: | |
225 dels.append(stmt) | |
226 handlers.remove(handler) | |
227 if not handlers and not stmtSources: | |
228 garbage.add(stmt) | |
229 | |
230 return Patch(addQuads=adds, delQuads=dels) | |
231 | |
232 def applySourcePatch(self, source: SourceUri, p: Patch): | |
233 for stmt in p.addQuads: | |
234 sourceUrls, handlers = self.table[stmt] | |
235 if source in sourceUrls: | |
236 raise ValueError("%s added stmt that it already had: %s" % (source, abbrevStmt(stmt))) | |
237 sourceUrls.add(source) | |
238 | |
239 with self.postDeleteStatements() as garbage: | |
240 for stmt in p.delQuads: | |
241 sourceUrls, handlers = self.table[stmt] | |
242 if source not in sourceUrls: | |
243 raise ValueError("%s deleting stmt that it didn't have: %s" % (source, abbrevStmt(stmt))) | |
244 sourceUrls.remove(source) | |
245 # this is rare, since some handler probably still has | |
246 # the stmt we're deleting, but it can happen e.g. when | |
247 # a handler was just deleted | |
248 if not sourceUrls and not handlers: | |
249 garbage.add(stmt) | |
250 | |
251 @REPLACE_SOURCE_STATEMENTS_CALLS.time() | |
252 def replaceSourceStatements(self, source: SourceUri, stmts: Sequence[Statement]): | |
253 log.debug('replaceSourceStatements with %s stmts', len(stmts)) | |
254 newStmts = set(stmts) | |
255 | |
256 with self.postDeleteStatements() as garbage: | |
257 for stmt, (sources, handlers) in self.table.items(): | |
258 if source in sources: | |
259 if stmt not in stmts: | |
260 sources.remove(source) | |
261 if not sources and not handlers: | |
262 garbage.add(stmt) | |
263 else: | |
264 if stmt in stmts: | |
265 sources.add(source) | |
266 newStmts.discard(stmt) | |
267 | |
268 self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[])) | |
269 | |
270 def discardHandler(self, handler: PatchSink): | |
271 with self.postDeleteStatements() as garbage: | |
272 for stmt, (sources, handlers) in self.table.items(): | |
273 handlers.discard(handler) | |
274 if not sources and not handlers: | |
275 garbage.add(stmt) | |
276 | |
277 def discardSource(self, source: SourceUri): | |
278 with self.postDeleteStatements() as garbage: | |
279 for stmt, (sources, handlers) in self.table.items(): | |
280 sources.discard(source) | |
281 if not sources and not handlers: | |
282 garbage.add(stmt) | |
283 | 66 |
284 | 67 |
285 class GraphClients(object): | 68 class GraphClients(object): |
286 """ | 69 """ |
287 All the active PatchSources and SSEHandlers | 70 All the active PatchSources and SSEHandlers |
463 handlers=[ | 246 handlers=[ |
464 (r'/state', State), | 247 (r'/state', State), |
465 (r'/graph/', GraphList), | 248 (r'/graph/', GraphList), |
466 (r'/graph/(.+)', PatchSink), | 249 (r'/graph/(.+)', PatchSink), |
467 (r'/metrics', Metrics), | 250 (r'/metrics', Metrics), |
468 ], | 251 ], graphClients=graphClients), |
469 graphClients=graphClients), | |
470 interface='::') | 252 interface='::') |
471 reactor.run() | 253 reactor.run() |