Mercurial > code > home > repos > homeauto
annotate service/collector/collector.py @ 1578:807282fb3136
fix deploy; redo stats display page
author | drewp@bigasterisk.com |
---|---|
date | Thu, 26 Aug 2021 18:03:30 -0700 |
parents | fafe86ae0b03 |
children |
rev | line source |
---|---|
296 | 1 """ |
2 requesting /graph/foo returns an SSE patch stream that's the | |
3 result of fetching multiple other SSE patch streams. The result stream | |
4 may include new statements injected by this service. | |
5 | |
6 Future: | |
7 - filter out unneeded stmts from the sources | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
8 - give a time resolution and concatenate any patches that come faster than that res |
296 | 9 """ |
794 | 10 import collections |
11 import json | |
12 import logging | |
13 import time | |
14 from typing import (Any, Callable, Dict, List, NewType, Optional, Sequence, Set, Tuple, Union) | |
692 | 15 |
794 | 16 import cyclone.sse |
17 import cyclone.web | |
18 from docopt import docopt | |
302 | 19 from patchablegraph import jsonFromPatch |
794 | 20 from patchablegraph.patchsource import PatchSource, ReconnectingPatchSource |
21 from prometheus_client import Counter, Gauge, Histogram, Summary | |
22 from prometheus_client.exposition import generate_latest | |
23 from prometheus_client.registry import REGISTRY | |
351
7716b1810d6c
reasoning & collector move into docker images
drewp@bigasterisk.com
parents:
316
diff
changeset
|
24 from rdfdb.patch import Patch |
794 | 25 from rdflib import Namespace, URIRef |
26 from rdflib.term import Node, Statement | |
27 from standardservice.logsetup import enableTwistedLog, log | |
28 from twisted.internet import defer, reactor | |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
29 |
715
fe9cfc088a49
consolidate debug page into ./index.html for now
drewp@bigasterisk.com
parents:
693
diff
changeset
|
30 from collector_config import config |
302 | 31 |
794 | 32 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
33 #SourceUri = NewType('SourceUri', URIRef) # doesn't work |
794 | 34 class SourceUri(URIRef): |
35 pass | |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
36 |
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
37 |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
38 ROOM = Namespace("http://projects.bigasterisk.com/room/") |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
39 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
40 |
794 | 41 GET_STATE_CALLS = Summary("get_state_calls", 'calls') |
42 LOCAL_STATEMENTS_PATCH_CALLS = Summary("local_statements_patch_calls", 'calls') | |
43 MAKE_SYNC_PATCH_CALLS = Summary("make_sync_patch_calls", 'calls') | |
44 ON_PATCH_CALLS = Summary("on_patch_calls", 'calls') | |
45 SEND_UPDATE_PATCH_CALLS = Summary("send_update_patch_calls", 'calls') | |
46 REPLACE_SOURCE_STATEMENTS_CALLS = Summary("replace_source_statements_calls", 'calls') | |
47 | |
48 | |
49 class Metrics(cyclone.web.RequestHandler): | |
50 | |
51 def get(self): | |
52 self.add_header('content-type', 'text/plain') | |
53 self.write(generate_latest(REGISTRY)) | |
54 | |
351
7716b1810d6c
reasoning & collector move into docker images
drewp@bigasterisk.com
parents:
316
diff
changeset
|
55 |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
56 class LocalStatements(object): |
301 | 57 """ |
58 functions that make statements originating from sse_collector itself | |
59 """ | |
794 | 60 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
61 def __init__(self, applyPatch: Callable[[URIRef, Patch], None]): |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
62 self.applyPatch = applyPatch |
794 | 63 self._sourceState: Dict[SourceUri, URIRef] = {} # source: state URIRef |
306 | 64 |
794 | 65 @LOCAL_STATEMENTS_PATCH_CALLS.time() |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
66 def setSourceState(self, source: SourceUri, state: URIRef): |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
67 """ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
68 add a patch to the COLLECTOR graph about the state of this |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
69 source. state=None to remove the source. |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
70 """ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
71 oldState = self._sourceState.get(source, None) |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
72 if state == oldState: |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
73 return |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
74 log.info('source state %s -> %s', source, state) |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
75 if oldState is None: |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
76 self._sourceState[source] = state |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
77 self.applyPatch(COLLECTOR, Patch(addQuads=[ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
78 (COLLECTOR, ROOM['source'], source, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
79 (source, ROOM['state'], state, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
80 ])) |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
81 elif state is None: |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
82 del self._sourceState[source] |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
83 self.applyPatch(COLLECTOR, Patch(delQuads=[ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
84 (COLLECTOR, ROOM['source'], source, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
85 (source, ROOM['state'], oldState, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
86 ])) |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
87 else: |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
88 self._sourceState[source] = state |
794 | 89 self.applyPatch(COLLECTOR, Patch(addQuads=[ |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
90 (source, ROOM['state'], state, COLLECTOR), |
794 | 91 ], delQuads=[ |
92 (source, ROOM['state'], oldState, COLLECTOR), | |
93 ])) | |
94 | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
95 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
96 def abbrevTerm(t: Union[URIRef, Node]) -> Union[str, Node]: |
301 | 97 if isinstance(t, URIRef): |
794 | 98 return (t.replace('http://projects.bigasterisk.com/room/', 'room:').replace('http://projects.bigasterisk.com/device/', |
99 'dev:').replace('http://bigasterisk.com/sse_collector/', 'sc:')) | |
301 | 100 return t |
101 | |
794 | 102 |
103 def abbrevStmt(stmt: Statement) -> str: | |
104 return '(%s %s %s %s)' % (abbrevTerm(stmt[0]), abbrevTerm(stmt[1]), abbrevTerm(stmt[2]), abbrevTerm(stmt[3])) | |
105 | |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
106 |
470 | 107 class PatchSink(cyclone.sse.SSEHandler): |
301 | 108 _handlerSerial = 0 |
794 | 109 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
110 def __init__(self, application: cyclone.web.Application, request): |
296 | 111 cyclone.sse.SSEHandler.__init__(self, application, request) |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
112 self.bound = False |
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
113 self.created = time.time() |
296 | 114 self.graphClients = self.settings.graphClients |
692 | 115 |
470 | 116 self._serial = PatchSink._handlerSerial |
117 PatchSink._handlerSerial += 1 | |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
118 self.lastPatchSentTime: float = 0.0 |
301 | 119 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
120 def __repr__(self) -> str: |
301 | 121 return '<Handler #%s>' % self._serial |
306 | 122 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
123 def state(self) -> Dict: |
306 | 124 return { |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
125 'created': round(self.created, 2), |
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
126 'ageHours': round((time.time() - self.created) / 3600, 2), |
306 | 127 'streamId': self.streamId, |
794 | 128 'remoteIp': self.request.remote_ip, # wrong, need some forwarded-for thing |
650 | 129 'foafAgent': self.request.headers.get('X-Foaf-Agent'), |
306 | 130 'userAgent': self.request.headers.get('user-agent'), |
131 } | |
692 | 132 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
133 def bind(self, *args, **kwargs): |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
134 self.streamId = args[0] |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
135 |
301 | 136 self.graphClients.addSseHandler(self) |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
137 # If something goes wrong with addSseHandler, I don't want to |
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
138 # try removeSseHandler. |
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
139 self.bound = True |
692 | 140 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
141 def unbind(self) -> None: |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
142 if self.bound: |
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
143 self.graphClients.removeSseHandler(self) |
296 | 144 |
692 | 145 |
794 | 146 StatementTable = Dict[Statement, Tuple[Set[SourceUri], Set[PatchSink]]] |
470 | 147 |
148 | |
149 class PostDeleter(object): | |
794 | 150 |
470 | 151 def __init__(self, statements: StatementTable): |
152 self.statements = statements | |
153 | |
154 def __enter__(self): | |
794 | 155 self._garbage: List[Statement] = [] |
470 | 156 return self |
692 | 157 |
794 | 158 def add(self, stmt: Statement): |
470 | 159 self._garbage.append(stmt) |
692 | 160 |
470 | 161 def __exit__(self, type, value, traceback): |
162 if type is not None: | |
794 | 163 raise NotImplementedError() |
470 | 164 for stmt in self._garbage: |
165 del self.statements[stmt] | |
166 | |
692 | 167 |
470 | 168 class ActiveStatements(object): |
794 | 169 |
470 | 170 def __init__(self): |
171 # This table holds statements asserted by any of our sources | |
172 # plus local statements that we introduce (source is | |
173 # http://bigasterisk.com/sse_collector/). | |
794 | 174 self.table: StatementTable = collections.defaultdict(lambda: (set(), set())) |
470 | 175 |
176 def state(self) -> Dict: | |
177 return { | |
178 'len': len(self.table), | |
794 | 179 } |
692 | 180 |
470 | 181 def postDeleteStatements(self) -> PostDeleter: |
182 return PostDeleter(self.table) | |
692 | 183 |
470 | 184 def pprintTable(self) -> None: |
794 | 185 for i, (stmt, (sources, handlers)) in enumerate(sorted(self.table.items())): |
186 print("%03d. %-80s from %s to %s" % (i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers)) | |
470 | 187 |
794 | 188 @MAKE_SYNC_PATCH_CALLS.time() |
470 | 189 def makeSyncPatch(self, handler: PatchSink, sources: Set[SourceUri]): |
190 # todo: this could run all handlers at once, which is how we | |
191 # use it anyway | |
192 adds = [] | |
193 dels = [] | |
194 | |
195 with self.postDeleteStatements() as garbage: | |
196 for stmt, (stmtSources, handlers) in self.table.items(): | |
197 belongsInHandler = not sources.isdisjoint(stmtSources) | |
198 handlerHasIt = handler in handlers | |
794 | 199 # log.debug("%s belong=%s has=%s", |
200 # abbrevStmt(stmt), belongsInHandler, handlerHasIt) | |
470 | 201 if belongsInHandler and not handlerHasIt: |
202 adds.append(stmt) | |
203 handlers.add(handler) | |
204 elif not belongsInHandler and handlerHasIt: | |
205 dels.append(stmt) | |
206 handlers.remove(handler) | |
207 if not handlers and not stmtSources: | |
208 garbage.add(stmt) | |
209 | |
210 return Patch(addQuads=adds, delQuads=dels) | |
692 | 211 |
470 | 212 def applySourcePatch(self, source: SourceUri, p: Patch): |
213 for stmt in p.addQuads: | |
214 sourceUrls, handlers = self.table[stmt] | |
215 if source in sourceUrls: | |
794 | 216 raise ValueError("%s added stmt that it already had: %s" % (source, abbrevStmt(stmt))) |
470 | 217 sourceUrls.add(source) |
692 | 218 |
470 | 219 with self.postDeleteStatements() as garbage: |
220 for stmt in p.delQuads: | |
221 sourceUrls, handlers = self.table[stmt] | |
222 if source not in sourceUrls: | |
794 | 223 raise ValueError("%s deleting stmt that it didn't have: %s" % (source, abbrevStmt(stmt))) |
470 | 224 sourceUrls.remove(source) |
225 # this is rare, since some handler probably still has | |
226 # the stmt we're deleting, but it can happen e.g. when | |
227 # a handler was just deleted | |
228 if not sourceUrls and not handlers: | |
229 garbage.add(stmt) | |
230 | |
794 | 231 @REPLACE_SOURCE_STATEMENTS_CALLS.time() |
232 def replaceSourceStatements(self, source: SourceUri, stmts: Sequence[Statement]): | |
470 | 233 log.debug('replaceSourceStatements with %s stmts', len(stmts)) |
234 newStmts = set(stmts) | |
235 | |
236 with self.postDeleteStatements() as garbage: | |
237 for stmt, (sources, handlers) in self.table.items(): | |
238 if source in sources: | |
239 if stmt not in stmts: | |
240 sources.remove(source) | |
241 if not sources and not handlers: | |
242 garbage.add(stmt) | |
243 else: | |
244 if stmt in stmts: | |
245 sources.add(source) | |
246 newStmts.discard(stmt) | |
247 | |
248 self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[])) | |
249 | |
250 def discardHandler(self, handler: PatchSink): | |
251 with self.postDeleteStatements() as garbage: | |
252 for stmt, (sources, handlers) in self.table.items(): | |
253 handlers.discard(handler) | |
254 if not sources and not handlers: | |
255 garbage.add(stmt) | |
256 | |
257 def discardSource(self, source: SourceUri): | |
258 with self.postDeleteStatements() as garbage: | |
259 for stmt, (sources, handlers) in self.table.items(): | |
260 sources.discard(source) | |
261 if not sources and not handlers: | |
262 garbage.add(stmt) | |
263 | |
264 | |
265 class GraphClients(object): | |
266 """ | |
267 All the active PatchSources and SSEHandlers | |
268 | |
269 To handle all the overlapping-statement cases, we store a set of | |
270 true statements along with the sources that are currently | |
271 asserting them and the requesters who currently know them. As | |
272 statements come and go, we make patches to send to requesters. | |
273 """ | |
794 | 274 |
470 | 275 def __init__(self): |
276 self.clients: Dict[SourceUri, PatchSource] = {} # (COLLECTOR is not listed) | |
277 self.handlers: Set[PatchSink] = set() | |
278 self.statements: ActiveStatements = ActiveStatements() | |
692 | 279 |
470 | 280 self._localStatements = LocalStatements(self._onPatch) |
281 | |
282 def state(self) -> Dict: | |
283 return { | |
794 | 284 'clients': sorted([ps.state() for ps in self.clients.values()], key=lambda r: r['reconnectedPatchSource']['url']), |
285 'sseHandlers': sorted([h.state() for h in self.handlers], key=lambda r: (r['streamId'], r['created'])), | |
470 | 286 'statements': self.statements.state(), |
287 } | |
288 | |
289 def _sourcesForHandler(self, handler: PatchSink) -> List[SourceUri]: | |
290 streamId = handler.streamId | |
291 matches = [s for s in config['streams'] if s['id'] == streamId] | |
292 if len(matches) != 1: | |
293 raise ValueError("%s matches for %r" % (len(matches), streamId)) | |
794 | 294 return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [COLLECTOR] |
470 | 295 |
794 | 296 @ON_PATCH_CALLS.time() |
297 def _onPatch(self, source: SourceUri, p: Patch, fullGraph: bool = False): | |
470 | 298 if fullGraph: |
299 # a reconnect may need to resend the full graph even | |
300 # though we've already sent some statements | |
301 self.statements.replaceSourceStatements(source, p.addQuads) | |
302 else: | |
303 self.statements.applySourcePatch(source, p) | |
304 | |
305 self._sendUpdatePatch() | |
306 | |
307 if log.isEnabledFor(logging.DEBUG): | |
308 self.statements.pprintTable() | |
309 | |
310 if source != COLLECTOR: | |
794 | 311 self._localStatements.setSourceState(source, ROOM['fullGraphReceived'] if fullGraph else ROOM['patchesReceived']) |
470 | 312 |
794 | 313 @SEND_UPDATE_PATCH_CALLS.time() |
314 def _sendUpdatePatch(self, handler: Optional[PatchSink] = None): | |
470 | 315 """ |
316 send a patch event out this handler to bring it up to date with | |
317 self.statements | |
318 """ | |
319 now = time.time() | |
320 selected = self.handlers | |
321 if handler is not None: | |
322 if handler not in self.handlers: | |
323 log.error("called _sendUpdatePatch on a handler that's gone") | |
324 return | |
325 selected = {handler} | |
326 # reduce loops here- prepare all patches at once | |
327 for h in selected: | |
328 period = .9 | |
329 if 'Raspbian' in h.request.headers.get('user-agent', ''): | |
330 period = 5 | |
331 if h.lastPatchSentTime > now - period: | |
332 continue | |
333 p = self.statements.makeSyncPatch(h, set(self._sourcesForHandler(h))) | |
334 log.debug('makeSyncPatch for %r: %r', h, p.jsonRepr) | |
335 if not p.isNoop(): | |
336 log.debug("send patch %s to %s", p.shortSummary(), h) | |
337 # This can be a giant line, which was a problem | |
338 # once. Might be nice for this service to try to break | |
339 # it up into multiple sends, although there's no | |
340 # guarantee at all since any single stmt could be any | |
341 # length. | |
794 | 342 h.sendEvent(message=jsonFromPatch(p).encode('utf8'), event=b'patch') |
470 | 343 h.lastPatchSentTime = now |
344 else: | |
345 log.debug('nothing to send to %s', h) | |
692 | 346 |
470 | 347 def addSseHandler(self, handler: PatchSink): |
348 log.info('addSseHandler %r %r', handler, handler.streamId) | |
349 | |
350 # fail early if id doesn't match | |
351 sources = self._sourcesForHandler(handler) | |
352 | |
353 self.handlers.add(handler) | |
692 | 354 |
470 | 355 for source in sources: |
356 if source not in self.clients and source != COLLECTOR: | |
357 log.debug('connect to patch source %s', source) | |
358 self._localStatements.setSourceState(source, ROOM['connect']) | |
794 | 359 self.clients[source] = ReconnectingPatchSource(source, |
360 listener=lambda p, fullGraph, source=source: self._onPatch(source, p, fullGraph), | |
361 reconnectSecs=10) | |
470 | 362 log.debug('bring new client up to date') |
363 | |
364 self._sendUpdatePatch(handler) | |
692 | 365 |
470 | 366 def removeSseHandler(self, handler: PatchSink): |
367 log.info('removeSseHandler %r', handler) | |
368 self.statements.discardHandler(handler) | |
369 for source in self._sourcesForHandler(handler): | |
370 for otherHandler in self.handlers: | |
794 | 371 if (otherHandler != handler and source in self._sourcesForHandler(otherHandler)): |
470 | 372 # still in use |
373 break | |
374 else: | |
375 self._stopClient(source) | |
692 | 376 |
470 | 377 self.handlers.remove(handler) |
378 | |
379 def _stopClient(self, url: SourceUri): | |
380 if url == COLLECTOR: | |
381 return | |
692 | 382 |
470 | 383 self.clients[url].stop() |
384 | |
385 self.statements.discardSource(url) | |
692 | 386 |
470 | 387 self._localStatements.setSourceState(url, None) |
388 if url in self.clients: | |
389 del self.clients[url] | |
390 | |
391 self.cleanup() | |
692 | 392 |
470 | 393 def cleanup(self): |
394 """ | |
395 despite the attempts above, we still get useless rows in the table | |
396 sometimes | |
397 """ | |
398 with self.statements.postDeleteStatements() as garbage: | |
399 for stmt, (sources, handlers) in self.statements.table.items(): | |
400 if not sources and not any(h in self.handlers for h in handlers): | |
401 garbage.add(stmt) | |
692 | 402 |
470 | 403 |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
404 class State(cyclone.web.RequestHandler): |
794 | 405 |
406 @GET_STATE_CALLS.time() | |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
407 def get(self) -> None: |
306 | 408 try: |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
409 state = self.settings.graphClients.state() |
794 | 410 self.write(json.dumps({'graphClients': state}, indent=2, default=lambda obj: '<unserializable>')) |
595 | 411 except Exception: |
794 | 412 import traceback |
413 traceback.print_exc() | |
306 | 414 raise |
693
be2fbdbdf549
collector: add /graphlist, plus logging updates
drewp@bigasterisk.com
parents:
692
diff
changeset
|
415 |
794 | 416 |
693
be2fbdbdf549
collector: add /graphlist, plus logging updates
drewp@bigasterisk.com
parents:
692
diff
changeset
|
417 class GraphList(cyclone.web.RequestHandler): |
794 | 418 |
693
be2fbdbdf549
collector: add /graphlist, plus logging updates
drewp@bigasterisk.com
parents:
692
diff
changeset
|
419 def get(self) -> None: |
be2fbdbdf549
collector: add /graphlist, plus logging updates
drewp@bigasterisk.com
parents:
692
diff
changeset
|
420 self.write(json.dumps(config['streams'])) |
be2fbdbdf549
collector: add /graphlist, plus logging updates
drewp@bigasterisk.com
parents:
692
diff
changeset
|
421 |
794 | 422 |
296 | 423 if __name__ == '__main__': |
424 arg = docopt(""" | |
425 Usage: sse_collector.py [options] | |
426 | |
427 -v Verbose | |
693
be2fbdbdf549
collector: add /graphlist, plus logging updates
drewp@bigasterisk.com
parents:
692
diff
changeset
|
428 -i Info level only |
296 | 429 """) |
693
be2fbdbdf549
collector: add /graphlist, plus logging updates
drewp@bigasterisk.com
parents:
692
diff
changeset
|
430 |
be2fbdbdf549
collector: add /graphlist, plus logging updates
drewp@bigasterisk.com
parents:
692
diff
changeset
|
431 if arg['-v'] or arg['-i']: |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
432 enableTwistedLog() |
693
be2fbdbdf549
collector: add /graphlist, plus logging updates
drewp@bigasterisk.com
parents:
692
diff
changeset
|
433 log.setLevel(logging.DEBUG if arg['-v'] else logging.INFO) |
442
ee74dc3b58fb
collector build improvements; stats and logging
drewp@bigasterisk.com
parents:
439
diff
changeset
|
434 defer.setDebugging(True) |
296 | 435 |
436 graphClients = GraphClients() | |
692 | 437 |
794 | 438 reactor.listenTCP(9072, |
439 cyclone.web.Application(handlers=[ | |
440 (r"/()", cyclone.web.StaticFileHandler, { | |
441 "path": ".", | |
442 "default_filename": "index.html" | |
443 }), | |
444 (r'/state', State), | |
445 (r'/graph/', GraphList), | |
446 (r'/graph/(.+)', PatchSink), | |
447 (r'/metrics', Metrics), | |
448 ], | |
449 graphClients=graphClients), | |
450 interface='::') | |
296 | 451 reactor.run() |