Mercurial > code > home > repos > homeauto
comparison service/collector/collector.py @ 794:fafe86ae0b03
py3, k8s, prometheus updates
author | drewp@bigasterisk.com |
---|---|
date | Sat, 26 Dec 2020 17:00:08 -0800 |
parents | 6d84cd3eb277 |
children |
comparison
equal
deleted
inserted
replaced
793:c3e3bd5dfa0b | 794:fafe86ae0b03 |
---|---|
5 | 5 |
6 Future: | 6 Future: |
7 - filter out unneeded stmts from the sources | 7 - filter out unneeded stmts from the sources |
8 - give a time resolution and concatenate any patches that come faster than that res | 8 - give a time resolution and concatenate any patches that come faster than that res |
9 """ | 9 """ |
10 import collections | |
11 import json | |
12 import logging | |
13 import time | |
14 from typing import (Any, Callable, Dict, List, NewType, Optional, Sequence, Set, Tuple, Union) | |
15 | |
16 import cyclone.sse | |
17 import cyclone.web | |
10 from docopt import docopt | 18 from docopt import docopt |
11 from greplin import scales | |
12 from greplin.scales.cyclonehandler import StatsHandler | |
13 from rdflib import Namespace, URIRef | |
14 | |
15 from typing import TYPE_CHECKING | |
16 if TYPE_CHECKING: | |
17 from rdflib import StatementType | |
18 else: | |
19 class StatementType: pass # type: ignore | |
20 | |
21 | |
22 from rdflib.term import Node | |
23 from twisted.internet import reactor, defer | |
24 from typing import Callable, Dict, NewType, Tuple, Union, Any, Sequence, Set, List, Optional | |
25 import cyclone.web, cyclone.sse | |
26 import logging, collections, json, time | |
27 | |
28 from standardservice.logsetup import log, enableTwistedLog | |
29 from patchablegraph import jsonFromPatch | 19 from patchablegraph import jsonFromPatch |
20 from patchablegraph.patchsource import PatchSource, ReconnectingPatchSource | |
21 from prometheus_client import Counter, Gauge, Histogram, Summary | |
22 from prometheus_client.exposition import generate_latest | |
23 from prometheus_client.registry import REGISTRY | |
30 from rdfdb.patch import Patch | 24 from rdfdb.patch import Patch |
31 | 25 from rdflib import Namespace, URIRef |
32 from patchablegraph.patchsource import ReconnectingPatchSource | 26 from rdflib.term import Node, Statement |
27 from standardservice.logsetup import enableTwistedLog, log | |
28 from twisted.internet import defer, reactor | |
33 | 29 |
34 from collector_config import config | 30 from collector_config import config |
35 | 31 |
32 | |
36 #SourceUri = NewType('SourceUri', URIRef) # doesn't work | 33 #SourceUri = NewType('SourceUri', URIRef) # doesn't work |
37 class SourceUri(URIRef): pass | 34 class SourceUri(URIRef): |
35 pass | |
38 | 36 |
39 | 37 |
40 ROOM = Namespace("http://projects.bigasterisk.com/room/") | 38 ROOM = Namespace("http://projects.bigasterisk.com/room/") |
41 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) | 39 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) |
42 | 40 |
43 STATS = scales.collection('/root', | 41 GET_STATE_CALLS = Summary("get_state_calls", 'calls') |
44 scales.PmfStat('getState'), | 42 LOCAL_STATEMENTS_PATCH_CALLS = Summary("local_statements_patch_calls", 'calls') |
45 scales.PmfStat('localStatementsPatch'), | 43 MAKE_SYNC_PATCH_CALLS = Summary("make_sync_patch_calls", 'calls') |
46 scales.PmfStat('makeSyncPatch'), | 44 ON_PATCH_CALLS = Summary("on_patch_calls", 'calls') |
47 scales.PmfStat('onPatch'), | 45 SEND_UPDATE_PATCH_CALLS = Summary("send_update_patch_calls", 'calls') |
48 scales.PmfStat('sendUpdatePatch'), | 46 REPLACE_SOURCE_STATEMENTS_CALLS = Summary("replace_source_statements_calls", 'calls') |
49 scales.PmfStat('replaceSourceStatements'), | 47 |
50 ) | 48 |
49 class Metrics(cyclone.web.RequestHandler): | |
50 | |
51 def get(self): | |
52 self.add_header('content-type', 'text/plain') | |
53 self.write(generate_latest(REGISTRY)) | |
54 | |
51 | 55 |
52 class LocalStatements(object): | 56 class LocalStatements(object): |
53 """ | 57 """ |
54 functions that make statements originating from sse_collector itself | 58 functions that make statements originating from sse_collector itself |
55 """ | 59 """ |
60 | |
56 def __init__(self, applyPatch: Callable[[URIRef, Patch], None]): | 61 def __init__(self, applyPatch: Callable[[URIRef, Patch], None]): |
57 self.applyPatch = applyPatch | 62 self.applyPatch = applyPatch |
58 self._sourceState: Dict[SourceUri, URIRef] = {} # source: state URIRef | 63 self._sourceState: Dict[SourceUri, URIRef] = {} # source: state URIRef |
59 | 64 |
60 @STATS.localStatementsPatch.time() | 65 @LOCAL_STATEMENTS_PATCH_CALLS.time() |
61 def setSourceState(self, source: SourceUri, state: URIRef): | 66 def setSourceState(self, source: SourceUri, state: URIRef): |
62 """ | 67 """ |
63 add a patch to the COLLECTOR graph about the state of this | 68 add a patch to the COLLECTOR graph about the state of this |
64 source. state=None to remove the source. | 69 source. state=None to remove the source. |
65 """ | 70 """ |
79 (COLLECTOR, ROOM['source'], source, COLLECTOR), | 84 (COLLECTOR, ROOM['source'], source, COLLECTOR), |
80 (source, ROOM['state'], oldState, COLLECTOR), | 85 (source, ROOM['state'], oldState, COLLECTOR), |
81 ])) | 86 ])) |
82 else: | 87 else: |
83 self._sourceState[source] = state | 88 self._sourceState[source] = state |
84 self.applyPatch(COLLECTOR, Patch( | 89 self.applyPatch(COLLECTOR, Patch(addQuads=[ |
85 addQuads=[ | |
86 (source, ROOM['state'], state, COLLECTOR), | 90 (source, ROOM['state'], state, COLLECTOR), |
87 ], | 91 ], delQuads=[ |
88 delQuads=[ | 92 (source, ROOM['state'], oldState, COLLECTOR), |
89 (source, ROOM['state'], oldState, COLLECTOR), | 93 ])) |
90 ])) | 94 |
91 | 95 |
92 def abbrevTerm(t: Union[URIRef, Node]) -> Union[str, Node]: | 96 def abbrevTerm(t: Union[URIRef, Node]) -> Union[str, Node]: |
93 if isinstance(t, URIRef): | 97 if isinstance(t, URIRef): |
94 return (t.replace('http://projects.bigasterisk.com/room/', 'room:') | 98 return (t.replace('http://projects.bigasterisk.com/room/', 'room:').replace('http://projects.bigasterisk.com/device/', |
95 .replace('http://projects.bigasterisk.com/device/', 'dev:') | 99 'dev:').replace('http://bigasterisk.com/sse_collector/', 'sc:')) |
96 .replace('http://bigasterisk.com/sse_collector/', 'sc:')) | |
97 return t | 100 return t |
98 | 101 |
99 def abbrevStmt(stmt: StatementType) -> str: | 102 |
100 return '(%s %s %s %s)' % (abbrevTerm(stmt[0]), abbrevTerm(stmt[1]), | 103 def abbrevStmt(stmt: Statement) -> str: |
101 abbrevTerm(stmt[2]), abbrevTerm(stmt[3])) | 104 return '(%s %s %s %s)' % (abbrevTerm(stmt[0]), abbrevTerm(stmt[1]), abbrevTerm(stmt[2]), abbrevTerm(stmt[3])) |
105 | |
102 | 106 |
103 class PatchSink(cyclone.sse.SSEHandler): | 107 class PatchSink(cyclone.sse.SSEHandler): |
104 _handlerSerial = 0 | 108 _handlerSerial = 0 |
109 | |
105 def __init__(self, application: cyclone.web.Application, request): | 110 def __init__(self, application: cyclone.web.Application, request): |
106 cyclone.sse.SSEHandler.__init__(self, application, request) | 111 cyclone.sse.SSEHandler.__init__(self, application, request) |
107 self.bound = False | 112 self.bound = False |
108 self.created = time.time() | 113 self.created = time.time() |
109 self.graphClients = self.settings.graphClients | 114 self.graphClients = self.settings.graphClients |
118 def state(self) -> Dict: | 123 def state(self) -> Dict: |
119 return { | 124 return { |
120 'created': round(self.created, 2), | 125 'created': round(self.created, 2), |
121 'ageHours': round((time.time() - self.created) / 3600, 2), | 126 'ageHours': round((time.time() - self.created) / 3600, 2), |
122 'streamId': self.streamId, | 127 'streamId': self.streamId, |
123 'remoteIp': self.request.remote_ip, # wrong, need some forwarded-for thing | 128 'remoteIp': self.request.remote_ip, # wrong, need some forwarded-for thing |
124 'foafAgent': self.request.headers.get('X-Foaf-Agent'), | 129 'foafAgent': self.request.headers.get('X-Foaf-Agent'), |
125 'userAgent': self.request.headers.get('user-agent'), | 130 'userAgent': self.request.headers.get('user-agent'), |
126 } | 131 } |
127 | 132 |
128 def bind(self, *args, **kwargs): | 133 def bind(self, *args, **kwargs): |
136 def unbind(self) -> None: | 141 def unbind(self) -> None: |
137 if self.bound: | 142 if self.bound: |
138 self.graphClients.removeSseHandler(self) | 143 self.graphClients.removeSseHandler(self) |
139 | 144 |
140 | 145 |
141 StatementTable = Dict[StatementType, Tuple[Set[SourceUri], Set[PatchSink]]] | 146 StatementTable = Dict[Statement, Tuple[Set[SourceUri], Set[PatchSink]]] |
142 | 147 |
143 | 148 |
144 class PostDeleter(object): | 149 class PostDeleter(object): |
150 | |
145 def __init__(self, statements: StatementTable): | 151 def __init__(self, statements: StatementTable): |
146 self.statements = statements | 152 self.statements = statements |
147 | 153 |
148 def __enter__(self): | 154 def __enter__(self): |
149 self._garbage: List[StatementType] = [] | 155 self._garbage: List[Statement] = [] |
150 return self | 156 return self |
151 | 157 |
152 def add(self, stmt: StatementType): | 158 def add(self, stmt: Statement): |
153 self._garbage.append(stmt) | 159 self._garbage.append(stmt) |
154 | 160 |
155 def __exit__(self, type, value, traceback): | 161 def __exit__(self, type, value, traceback): |
156 if type is not None: | 162 if type is not None: |
157 raise | 163 raise NotImplementedError() |
158 for stmt in self._garbage: | 164 for stmt in self._garbage: |
159 del self.statements[stmt] | 165 del self.statements[stmt] |
160 | 166 |
161 | 167 |
162 class ActiveStatements(object): | 168 class ActiveStatements(object): |
169 | |
163 def __init__(self): | 170 def __init__(self): |
164 # This table holds statements asserted by any of our sources | 171 # This table holds statements asserted by any of our sources |
165 # plus local statements that we introduce (source is | 172 # plus local statements that we introduce (source is |
166 # http://bigasterisk.com/sse_collector/). | 173 # http://bigasterisk.com/sse_collector/). |
167 self.table: StatementTable = collections.defaultdict( | 174 self.table: StatementTable = collections.defaultdict(lambda: (set(), set())) |
168 lambda: (set(), set())) | |
169 | 175 |
170 def state(self) -> Dict: | 176 def state(self) -> Dict: |
171 return { | 177 return { |
172 'len': len(self.table), | 178 'len': len(self.table), |
173 } | 179 } |
174 | 180 |
175 def postDeleteStatements(self) -> PostDeleter: | 181 def postDeleteStatements(self) -> PostDeleter: |
176 return PostDeleter(self.table) | 182 return PostDeleter(self.table) |
177 | 183 |
178 def pprintTable(self) -> None: | 184 def pprintTable(self) -> None: |
179 for i, (stmt, (sources, handlers)) in enumerate( | 185 for i, (stmt, (sources, handlers)) in enumerate(sorted(self.table.items())): |
180 sorted(self.table.items())): | 186 print("%03d. %-80s from %s to %s" % (i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers)) |
181 print("%03d. %-80s from %s to %s" % ( | 187 |
182 i, | 188 @MAKE_SYNC_PATCH_CALLS.time() |
183 abbrevStmt(stmt), | |
184 [abbrevTerm(s) for s in sources], | |
185 handlers)) | |
186 | |
187 @STATS.makeSyncPatch.time() | |
188 def makeSyncPatch(self, handler: PatchSink, sources: Set[SourceUri]): | 189 def makeSyncPatch(self, handler: PatchSink, sources: Set[SourceUri]): |
189 # todo: this could run all handlers at once, which is how we | 190 # todo: this could run all handlers at once, which is how we |
190 # use it anyway | 191 # use it anyway |
191 adds = [] | 192 adds = [] |
192 dels = [] | 193 dels = [] |
193 | 194 |
194 with self.postDeleteStatements() as garbage: | 195 with self.postDeleteStatements() as garbage: |
195 for stmt, (stmtSources, handlers) in self.table.items(): | 196 for stmt, (stmtSources, handlers) in self.table.items(): |
196 belongsInHandler = not sources.isdisjoint(stmtSources) | 197 belongsInHandler = not sources.isdisjoint(stmtSources) |
197 handlerHasIt = handler in handlers | 198 handlerHasIt = handler in handlers |
198 #log.debug("%s belong=%s has=%s", | 199 # log.debug("%s belong=%s has=%s", |
199 # abbrevStmt(stmt), belongsInHandler, handlerHasIt) | 200 # abbrevStmt(stmt), belongsInHandler, handlerHasIt) |
200 if belongsInHandler and not handlerHasIt: | 201 if belongsInHandler and not handlerHasIt: |
201 adds.append(stmt) | 202 adds.append(stmt) |
202 handlers.add(handler) | 203 handlers.add(handler) |
203 elif not belongsInHandler and handlerHasIt: | 204 elif not belongsInHandler and handlerHasIt: |
204 dels.append(stmt) | 205 dels.append(stmt) |
210 | 211 |
211 def applySourcePatch(self, source: SourceUri, p: Patch): | 212 def applySourcePatch(self, source: SourceUri, p: Patch): |
212 for stmt in p.addQuads: | 213 for stmt in p.addQuads: |
213 sourceUrls, handlers = self.table[stmt] | 214 sourceUrls, handlers = self.table[stmt] |
214 if source in sourceUrls: | 215 if source in sourceUrls: |
215 raise ValueError("%s added stmt that it already had: %s" % | 216 raise ValueError("%s added stmt that it already had: %s" % (source, abbrevStmt(stmt))) |
216 (source, abbrevStmt(stmt))) | |
217 sourceUrls.add(source) | 217 sourceUrls.add(source) |
218 | 218 |
219 with self.postDeleteStatements() as garbage: | 219 with self.postDeleteStatements() as garbage: |
220 for stmt in p.delQuads: | 220 for stmt in p.delQuads: |
221 sourceUrls, handlers = self.table[stmt] | 221 sourceUrls, handlers = self.table[stmt] |
222 if source not in sourceUrls: | 222 if source not in sourceUrls: |
223 raise ValueError("%s deleting stmt that it didn't have: %s" % | 223 raise ValueError("%s deleting stmt that it didn't have: %s" % (source, abbrevStmt(stmt))) |
224 (source, abbrevStmt(stmt))) | |
225 sourceUrls.remove(source) | 224 sourceUrls.remove(source) |
226 # this is rare, since some handler probably still has | 225 # this is rare, since some handler probably still has |
227 # the stmt we're deleting, but it can happen e.g. when | 226 # the stmt we're deleting, but it can happen e.g. when |
228 # a handler was just deleted | 227 # a handler was just deleted |
229 if not sourceUrls and not handlers: | 228 if not sourceUrls and not handlers: |
230 garbage.add(stmt) | 229 garbage.add(stmt) |
231 | 230 |
232 @STATS.replaceSourceStatements.time() | 231 @REPLACE_SOURCE_STATEMENTS_CALLS.time() |
233 def replaceSourceStatements(self, source: SourceUri, | 232 def replaceSourceStatements(self, source: SourceUri, stmts: Sequence[Statement]): |
234 stmts: Sequence[StatementType]): | |
235 log.debug('replaceSourceStatements with %s stmts', len(stmts)) | 233 log.debug('replaceSourceStatements with %s stmts', len(stmts)) |
236 newStmts = set(stmts) | 234 newStmts = set(stmts) |
237 | 235 |
238 with self.postDeleteStatements() as garbage: | 236 with self.postDeleteStatements() as garbage: |
239 for stmt, (sources, handlers) in self.table.items(): | 237 for stmt, (sources, handlers) in self.table.items(): |
262 sources.discard(source) | 260 sources.discard(source) |
263 if not sources and not handlers: | 261 if not sources and not handlers: |
264 garbage.add(stmt) | 262 garbage.add(stmt) |
265 | 263 |
266 | 264 |
267 | |
268 class GraphClients(object): | 265 class GraphClients(object): |
269 """ | 266 """ |
270 All the active PatchSources and SSEHandlers | 267 All the active PatchSources and SSEHandlers |
271 | 268 |
272 To handle all the overlapping-statement cases, we store a set of | 269 To handle all the overlapping-statement cases, we store a set of |
273 true statements along with the sources that are currently | 270 true statements along with the sources that are currently |
274 asserting them and the requesters who currently know them. As | 271 asserting them and the requesters who currently know them. As |
275 statements come and go, we make patches to send to requesters. | 272 statements come and go, we make patches to send to requesters. |
276 """ | 273 """ |
274 | |
277 def __init__(self): | 275 def __init__(self): |
278 self.clients: Dict[SourceUri, PatchSource] = {} # (COLLECTOR is not listed) | 276 self.clients: Dict[SourceUri, PatchSource] = {} # (COLLECTOR is not listed) |
279 self.handlers: Set[PatchSink] = set() | 277 self.handlers: Set[PatchSink] = set() |
280 self.statements: ActiveStatements = ActiveStatements() | 278 self.statements: ActiveStatements = ActiveStatements() |
281 | 279 |
282 self._localStatements = LocalStatements(self._onPatch) | 280 self._localStatements = LocalStatements(self._onPatch) |
283 | 281 |
284 def state(self) -> Dict: | 282 def state(self) -> Dict: |
285 return { | 283 return { |
286 'clients': sorted([ps.state() for ps in self.clients.values()], | 284 'clients': sorted([ps.state() for ps in self.clients.values()], key=lambda r: r['reconnectedPatchSource']['url']), |
287 key=lambda r: r['reconnectedPatchSource']['url']), | 285 'sseHandlers': sorted([h.state() for h in self.handlers], key=lambda r: (r['streamId'], r['created'])), |
288 'sseHandlers': sorted([h.state() for h in self.handlers], | |
289 key=lambda r: (r['streamId'], r['created'])), | |
290 'statements': self.statements.state(), | 286 'statements': self.statements.state(), |
291 } | 287 } |
292 | 288 |
293 def _sourcesForHandler(self, handler: PatchSink) -> List[SourceUri]: | 289 def _sourcesForHandler(self, handler: PatchSink) -> List[SourceUri]: |
294 streamId = handler.streamId | 290 streamId = handler.streamId |
295 matches = [s for s in config['streams'] if s['id'] == streamId] | 291 matches = [s for s in config['streams'] if s['id'] == streamId] |
296 if len(matches) != 1: | 292 if len(matches) != 1: |
297 raise ValueError("%s matches for %r" % (len(matches), streamId)) | 293 raise ValueError("%s matches for %r" % (len(matches), streamId)) |
298 return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [ | 294 return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [COLLECTOR] |
299 COLLECTOR] | 295 |
300 | 296 @ON_PATCH_CALLS.time() |
301 @STATS.onPatch.time() | 297 def _onPatch(self, source: SourceUri, p: Patch, fullGraph: bool = False): |
302 def _onPatch(self, source: SourceUri, p: Patch, fullGraph: bool=False): | |
303 if fullGraph: | 298 if fullGraph: |
304 # a reconnect may need to resend the full graph even | 299 # a reconnect may need to resend the full graph even |
305 # though we've already sent some statements | 300 # though we've already sent some statements |
306 self.statements.replaceSourceStatements(source, p.addQuads) | 301 self.statements.replaceSourceStatements(source, p.addQuads) |
307 else: | 302 else: |
311 | 306 |
312 if log.isEnabledFor(logging.DEBUG): | 307 if log.isEnabledFor(logging.DEBUG): |
313 self.statements.pprintTable() | 308 self.statements.pprintTable() |
314 | 309 |
315 if source != COLLECTOR: | 310 if source != COLLECTOR: |
316 self._localStatements.setSourceState( | 311 self._localStatements.setSourceState(source, ROOM['fullGraphReceived'] if fullGraph else ROOM['patchesReceived']) |
317 source, | 312 |
318 ROOM['fullGraphReceived'] if fullGraph else | 313 @SEND_UPDATE_PATCH_CALLS.time() |
319 ROOM['patchesReceived']) | 314 def _sendUpdatePatch(self, handler: Optional[PatchSink] = None): |
320 | |
321 @STATS.sendUpdatePatch.time() | |
322 def _sendUpdatePatch(self, handler: Optional[PatchSink]=None): | |
323 """ | 315 """ |
324 send a patch event out this handler to bring it up to date with | 316 send a patch event out this handler to bring it up to date with |
325 self.statements | 317 self.statements |
326 """ | 318 """ |
327 now = time.time() | 319 now = time.time() |
345 # This can be a giant line, which was a problem | 337 # This can be a giant line, which was a problem |
346 # once. Might be nice for this service to try to break | 338 # once. Might be nice for this service to try to break |
347 # it up into multiple sends, although there's no | 339 # it up into multiple sends, although there's no |
348 # guarantee at all since any single stmt could be any | 340 # guarantee at all since any single stmt could be any |
349 # length. | 341 # length. |
350 h.sendEvent(message=jsonFromPatch(p).encode('utf8'), | 342 h.sendEvent(message=jsonFromPatch(p).encode('utf8'), event=b'patch') |
351 event=b'patch') | |
352 h.lastPatchSentTime = now | 343 h.lastPatchSentTime = now |
353 else: | 344 else: |
354 log.debug('nothing to send to %s', h) | 345 log.debug('nothing to send to %s', h) |
355 | 346 |
356 def addSseHandler(self, handler: PatchSink): | 347 def addSseHandler(self, handler: PatchSink): |
363 | 354 |
364 for source in sources: | 355 for source in sources: |
365 if source not in self.clients and source != COLLECTOR: | 356 if source not in self.clients and source != COLLECTOR: |
366 log.debug('connect to patch source %s', source) | 357 log.debug('connect to patch source %s', source) |
367 self._localStatements.setSourceState(source, ROOM['connect']) | 358 self._localStatements.setSourceState(source, ROOM['connect']) |
368 self.clients[source] = ReconnectingPatchSource( | 359 self.clients[source] = ReconnectingPatchSource(source, |
369 source, | 360 listener=lambda p, fullGraph, source=source: self._onPatch(source, p, fullGraph), |
370 listener=lambda p, fullGraph, source=source: self._onPatch( | 361 reconnectSecs=10) |
371 source, p, fullGraph), | |
372 reconnectSecs=10) | |
373 log.debug('bring new client up to date') | 362 log.debug('bring new client up to date') |
374 | 363 |
375 self._sendUpdatePatch(handler) | 364 self._sendUpdatePatch(handler) |
376 | 365 |
377 def removeSseHandler(self, handler: PatchSink): | 366 def removeSseHandler(self, handler: PatchSink): |
378 log.info('removeSseHandler %r', handler) | 367 log.info('removeSseHandler %r', handler) |
379 self.statements.discardHandler(handler) | 368 self.statements.discardHandler(handler) |
380 for source in self._sourcesForHandler(handler): | 369 for source in self._sourcesForHandler(handler): |
381 for otherHandler in self.handlers: | 370 for otherHandler in self.handlers: |
382 if (otherHandler != handler and | 371 if (otherHandler != handler and source in self._sourcesForHandler(otherHandler)): |
383 source in self._sourcesForHandler(otherHandler)): | |
384 # still in use | 372 # still in use |
385 break | 373 break |
386 else: | 374 else: |
387 self._stopClient(source) | 375 self._stopClient(source) |
388 | 376 |
412 if not sources and not any(h in self.handlers for h in handlers): | 400 if not sources and not any(h in self.handlers for h in handlers): |
413 garbage.add(stmt) | 401 garbage.add(stmt) |
414 | 402 |
415 | 403 |
416 class State(cyclone.web.RequestHandler): | 404 class State(cyclone.web.RequestHandler): |
417 @STATS.getState.time() | 405 |
406 @GET_STATE_CALLS.time() | |
418 def get(self) -> None: | 407 def get(self) -> None: |
419 try: | 408 try: |
420 state = self.settings.graphClients.state() | 409 state = self.settings.graphClients.state() |
421 self.write(json.dumps({'graphClients': state}, indent=2, | 410 self.write(json.dumps({'graphClients': state}, indent=2, default=lambda obj: '<unserializable>')) |
422 default=lambda obj: '<unserializable>')) | |
423 except Exception: | 411 except Exception: |
424 import traceback; traceback.print_exc() | 412 import traceback |
413 traceback.print_exc() | |
425 raise | 414 raise |
426 | 415 |
416 | |
427 class GraphList(cyclone.web.RequestHandler): | 417 class GraphList(cyclone.web.RequestHandler): |
418 | |
428 def get(self) -> None: | 419 def get(self) -> None: |
429 self.write(json.dumps(config['streams'])) | 420 self.write(json.dumps(config['streams'])) |
421 | |
430 | 422 |
431 if __name__ == '__main__': | 423 if __name__ == '__main__': |
432 arg = docopt(""" | 424 arg = docopt(""" |
433 Usage: sse_collector.py [options] | 425 Usage: sse_collector.py [options] |
434 | 426 |
439 if arg['-v'] or arg['-i']: | 431 if arg['-v'] or arg['-i']: |
440 enableTwistedLog() | 432 enableTwistedLog() |
441 log.setLevel(logging.DEBUG if arg['-v'] else logging.INFO) | 433 log.setLevel(logging.DEBUG if arg['-v'] else logging.INFO) |
442 defer.setDebugging(True) | 434 defer.setDebugging(True) |
443 | 435 |
444 | |
445 graphClients = GraphClients() | 436 graphClients = GraphClients() |
446 #exporter = InfluxExporter(... to export some stats values | 437 |
447 | 438 reactor.listenTCP(9072, |
448 reactor.listenTCP( | 439 cyclone.web.Application(handlers=[ |
449 9072, | 440 (r"/()", cyclone.web.StaticFileHandler, { |
450 cyclone.web.Application( | 441 "path": ".", |
451 handlers=[ | 442 "default_filename": "index.html" |
452 (r"/()", cyclone.web.StaticFileHandler, { | 443 }), |
453 "path": ".", "default_filename": "index.html"}), | 444 (r'/state', State), |
454 (r'/state', State), | 445 (r'/graph/', GraphList), |
455 (r'/graph/', GraphList), | 446 (r'/graph/(.+)', PatchSink), |
456 (r'/graph/(.+)', PatchSink), | 447 (r'/metrics', Metrics), |
457 (r'/stats/(.*)', StatsHandler, {'serverName': 'collector'}), | 448 ], |
458 ], | 449 graphClients=graphClients), |
459 graphClients=graphClients), | 450 interface='::') |
460 interface='::') | |
461 reactor.run() | 451 reactor.run() |