comparison service/collector/collector.py @ 794:fafe86ae0b03

py3, k8s, prometheus updates
author drewp@bigasterisk.com
date Sat, 26 Dec 2020 17:00:08 -0800
parents 6d84cd3eb277
children
comparison
equal deleted inserted replaced
793:c3e3bd5dfa0b 794:fafe86ae0b03
5 5
6 Future: 6 Future:
7 - filter out unneeded stmts from the sources 7 - filter out unneeded stmts from the sources
8 - give a time resolution and concatenate any patches that come faster than that res 8 - give a time resolution and concatenate any patches that come faster than that res
9 """ 9 """
10 import collections
11 import json
12 import logging
13 import time
14 from typing import (Any, Callable, Dict, List, NewType, Optional, Sequence, Set, Tuple, Union)
15
16 import cyclone.sse
17 import cyclone.web
10 from docopt import docopt 18 from docopt import docopt
11 from greplin import scales
12 from greplin.scales.cyclonehandler import StatsHandler
13 from rdflib import Namespace, URIRef
14
15 from typing import TYPE_CHECKING
16 if TYPE_CHECKING:
17 from rdflib import StatementType
18 else:
19 class StatementType: pass # type: ignore
20
21
22 from rdflib.term import Node
23 from twisted.internet import reactor, defer
24 from typing import Callable, Dict, NewType, Tuple, Union, Any, Sequence, Set, List, Optional
25 import cyclone.web, cyclone.sse
26 import logging, collections, json, time
27
28 from standardservice.logsetup import log, enableTwistedLog
29 from patchablegraph import jsonFromPatch 19 from patchablegraph import jsonFromPatch
20 from patchablegraph.patchsource import PatchSource, ReconnectingPatchSource
21 from prometheus_client import Counter, Gauge, Histogram, Summary
22 from prometheus_client.exposition import generate_latest
23 from prometheus_client.registry import REGISTRY
30 from rdfdb.patch import Patch 24 from rdfdb.patch import Patch
31 25 from rdflib import Namespace, URIRef
32 from patchablegraph.patchsource import ReconnectingPatchSource 26 from rdflib.term import Node, Statement
27 from standardservice.logsetup import enableTwistedLog, log
28 from twisted.internet import defer, reactor
33 29
34 from collector_config import config 30 from collector_config import config
35 31
32
36 #SourceUri = NewType('SourceUri', URIRef) # doesn't work 33 #SourceUri = NewType('SourceUri', URIRef) # doesn't work
37 class SourceUri(URIRef): pass 34 class SourceUri(URIRef):
35 pass
38 36
39 37
40 ROOM = Namespace("http://projects.bigasterisk.com/room/") 38 ROOM = Namespace("http://projects.bigasterisk.com/room/")
41 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) 39 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/'))
42 40
43 STATS = scales.collection('/root', 41 GET_STATE_CALLS = Summary("get_state_calls", 'calls')
44 scales.PmfStat('getState'), 42 LOCAL_STATEMENTS_PATCH_CALLS = Summary("local_statements_patch_calls", 'calls')
45 scales.PmfStat('localStatementsPatch'), 43 MAKE_SYNC_PATCH_CALLS = Summary("make_sync_patch_calls", 'calls')
46 scales.PmfStat('makeSyncPatch'), 44 ON_PATCH_CALLS = Summary("on_patch_calls", 'calls')
47 scales.PmfStat('onPatch'), 45 SEND_UPDATE_PATCH_CALLS = Summary("send_update_patch_calls", 'calls')
48 scales.PmfStat('sendUpdatePatch'), 46 REPLACE_SOURCE_STATEMENTS_CALLS = Summary("replace_source_statements_calls", 'calls')
49 scales.PmfStat('replaceSourceStatements'), 47
50 ) 48
49 class Metrics(cyclone.web.RequestHandler):
50
51 def get(self):
52 self.add_header('content-type', 'text/plain')
53 self.write(generate_latest(REGISTRY))
54
51 55
52 class LocalStatements(object): 56 class LocalStatements(object):
53 """ 57 """
54 functions that make statements originating from sse_collector itself 58 functions that make statements originating from sse_collector itself
55 """ 59 """
60
56 def __init__(self, applyPatch: Callable[[URIRef, Patch], None]): 61 def __init__(self, applyPatch: Callable[[URIRef, Patch], None]):
57 self.applyPatch = applyPatch 62 self.applyPatch = applyPatch
58 self._sourceState: Dict[SourceUri, URIRef] = {} # source: state URIRef 63 self._sourceState: Dict[SourceUri, URIRef] = {} # source: state URIRef
59 64
60 @STATS.localStatementsPatch.time() 65 @LOCAL_STATEMENTS_PATCH_CALLS.time()
61 def setSourceState(self, source: SourceUri, state: URIRef): 66 def setSourceState(self, source: SourceUri, state: URIRef):
62 """ 67 """
63 add a patch to the COLLECTOR graph about the state of this 68 add a patch to the COLLECTOR graph about the state of this
64 source. state=None to remove the source. 69 source. state=None to remove the source.
65 """ 70 """
79 (COLLECTOR, ROOM['source'], source, COLLECTOR), 84 (COLLECTOR, ROOM['source'], source, COLLECTOR),
80 (source, ROOM['state'], oldState, COLLECTOR), 85 (source, ROOM['state'], oldState, COLLECTOR),
81 ])) 86 ]))
82 else: 87 else:
83 self._sourceState[source] = state 88 self._sourceState[source] = state
84 self.applyPatch(COLLECTOR, Patch( 89 self.applyPatch(COLLECTOR, Patch(addQuads=[
85 addQuads=[
86 (source, ROOM['state'], state, COLLECTOR), 90 (source, ROOM['state'], state, COLLECTOR),
87 ], 91 ], delQuads=[
88 delQuads=[ 92 (source, ROOM['state'], oldState, COLLECTOR),
89 (source, ROOM['state'], oldState, COLLECTOR), 93 ]))
90 ])) 94
91 95
92 def abbrevTerm(t: Union[URIRef, Node]) -> Union[str, Node]: 96 def abbrevTerm(t: Union[URIRef, Node]) -> Union[str, Node]:
93 if isinstance(t, URIRef): 97 if isinstance(t, URIRef):
94 return (t.replace('http://projects.bigasterisk.com/room/', 'room:') 98 return (t.replace('http://projects.bigasterisk.com/room/', 'room:').replace('http://projects.bigasterisk.com/device/',
95 .replace('http://projects.bigasterisk.com/device/', 'dev:') 99 'dev:').replace('http://bigasterisk.com/sse_collector/', 'sc:'))
96 .replace('http://bigasterisk.com/sse_collector/', 'sc:'))
97 return t 100 return t
98 101
99 def abbrevStmt(stmt: StatementType) -> str: 102
100 return '(%s %s %s %s)' % (abbrevTerm(stmt[0]), abbrevTerm(stmt[1]), 103 def abbrevStmt(stmt: Statement) -> str:
101 abbrevTerm(stmt[2]), abbrevTerm(stmt[3])) 104 return '(%s %s %s %s)' % (abbrevTerm(stmt[0]), abbrevTerm(stmt[1]), abbrevTerm(stmt[2]), abbrevTerm(stmt[3]))
105
102 106
103 class PatchSink(cyclone.sse.SSEHandler): 107 class PatchSink(cyclone.sse.SSEHandler):
104 _handlerSerial = 0 108 _handlerSerial = 0
109
105 def __init__(self, application: cyclone.web.Application, request): 110 def __init__(self, application: cyclone.web.Application, request):
106 cyclone.sse.SSEHandler.__init__(self, application, request) 111 cyclone.sse.SSEHandler.__init__(self, application, request)
107 self.bound = False 112 self.bound = False
108 self.created = time.time() 113 self.created = time.time()
109 self.graphClients = self.settings.graphClients 114 self.graphClients = self.settings.graphClients
118 def state(self) -> Dict: 123 def state(self) -> Dict:
119 return { 124 return {
120 'created': round(self.created, 2), 125 'created': round(self.created, 2),
121 'ageHours': round((time.time() - self.created) / 3600, 2), 126 'ageHours': round((time.time() - self.created) / 3600, 2),
122 'streamId': self.streamId, 127 'streamId': self.streamId,
123 'remoteIp': self.request.remote_ip, # wrong, need some forwarded-for thing 128 'remoteIp': self.request.remote_ip, # wrong, need some forwarded-for thing
124 'foafAgent': self.request.headers.get('X-Foaf-Agent'), 129 'foafAgent': self.request.headers.get('X-Foaf-Agent'),
125 'userAgent': self.request.headers.get('user-agent'), 130 'userAgent': self.request.headers.get('user-agent'),
126 } 131 }
127 132
128 def bind(self, *args, **kwargs): 133 def bind(self, *args, **kwargs):
136 def unbind(self) -> None: 141 def unbind(self) -> None:
137 if self.bound: 142 if self.bound:
138 self.graphClients.removeSseHandler(self) 143 self.graphClients.removeSseHandler(self)
139 144
140 145
141 StatementTable = Dict[StatementType, Tuple[Set[SourceUri], Set[PatchSink]]] 146 StatementTable = Dict[Statement, Tuple[Set[SourceUri], Set[PatchSink]]]
142 147
143 148
144 class PostDeleter(object): 149 class PostDeleter(object):
150
145 def __init__(self, statements: StatementTable): 151 def __init__(self, statements: StatementTable):
146 self.statements = statements 152 self.statements = statements
147 153
148 def __enter__(self): 154 def __enter__(self):
149 self._garbage: List[StatementType] = [] 155 self._garbage: List[Statement] = []
150 return self 156 return self
151 157
152 def add(self, stmt: StatementType): 158 def add(self, stmt: Statement):
153 self._garbage.append(stmt) 159 self._garbage.append(stmt)
154 160
155 def __exit__(self, type, value, traceback): 161 def __exit__(self, type, value, traceback):
156 if type is not None: 162 if type is not None:
157 raise 163 raise NotImplementedError()
158 for stmt in self._garbage: 164 for stmt in self._garbage:
159 del self.statements[stmt] 165 del self.statements[stmt]
160 166
161 167
162 class ActiveStatements(object): 168 class ActiveStatements(object):
169
163 def __init__(self): 170 def __init__(self):
164 # This table holds statements asserted by any of our sources 171 # This table holds statements asserted by any of our sources
165 # plus local statements that we introduce (source is 172 # plus local statements that we introduce (source is
166 # http://bigasterisk.com/sse_collector/). 173 # http://bigasterisk.com/sse_collector/).
167 self.table: StatementTable = collections.defaultdict( 174 self.table: StatementTable = collections.defaultdict(lambda: (set(), set()))
168 lambda: (set(), set()))
169 175
170 def state(self) -> Dict: 176 def state(self) -> Dict:
171 return { 177 return {
172 'len': len(self.table), 178 'len': len(self.table),
173 } 179 }
174 180
175 def postDeleteStatements(self) -> PostDeleter: 181 def postDeleteStatements(self) -> PostDeleter:
176 return PostDeleter(self.table) 182 return PostDeleter(self.table)
177 183
178 def pprintTable(self) -> None: 184 def pprintTable(self) -> None:
179 for i, (stmt, (sources, handlers)) in enumerate( 185 for i, (stmt, (sources, handlers)) in enumerate(sorted(self.table.items())):
180 sorted(self.table.items())): 186 print("%03d. %-80s from %s to %s" % (i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers))
181 print("%03d. %-80s from %s to %s" % ( 187
182 i, 188 @MAKE_SYNC_PATCH_CALLS.time()
183 abbrevStmt(stmt),
184 [abbrevTerm(s) for s in sources],
185 handlers))
186
187 @STATS.makeSyncPatch.time()
188 def makeSyncPatch(self, handler: PatchSink, sources: Set[SourceUri]): 189 def makeSyncPatch(self, handler: PatchSink, sources: Set[SourceUri]):
189 # todo: this could run all handlers at once, which is how we 190 # todo: this could run all handlers at once, which is how we
190 # use it anyway 191 # use it anyway
191 adds = [] 192 adds = []
192 dels = [] 193 dels = []
193 194
194 with self.postDeleteStatements() as garbage: 195 with self.postDeleteStatements() as garbage:
195 for stmt, (stmtSources, handlers) in self.table.items(): 196 for stmt, (stmtSources, handlers) in self.table.items():
196 belongsInHandler = not sources.isdisjoint(stmtSources) 197 belongsInHandler = not sources.isdisjoint(stmtSources)
197 handlerHasIt = handler in handlers 198 handlerHasIt = handler in handlers
198 #log.debug("%s belong=%s has=%s", 199 # log.debug("%s belong=%s has=%s",
199 # abbrevStmt(stmt), belongsInHandler, handlerHasIt) 200 # abbrevStmt(stmt), belongsInHandler, handlerHasIt)
200 if belongsInHandler and not handlerHasIt: 201 if belongsInHandler and not handlerHasIt:
201 adds.append(stmt) 202 adds.append(stmt)
202 handlers.add(handler) 203 handlers.add(handler)
203 elif not belongsInHandler and handlerHasIt: 204 elif not belongsInHandler and handlerHasIt:
204 dels.append(stmt) 205 dels.append(stmt)
210 211
211 def applySourcePatch(self, source: SourceUri, p: Patch): 212 def applySourcePatch(self, source: SourceUri, p: Patch):
212 for stmt in p.addQuads: 213 for stmt in p.addQuads:
213 sourceUrls, handlers = self.table[stmt] 214 sourceUrls, handlers = self.table[stmt]
214 if source in sourceUrls: 215 if source in sourceUrls:
215 raise ValueError("%s added stmt that it already had: %s" % 216 raise ValueError("%s added stmt that it already had: %s" % (source, abbrevStmt(stmt)))
216 (source, abbrevStmt(stmt)))
217 sourceUrls.add(source) 217 sourceUrls.add(source)
218 218
219 with self.postDeleteStatements() as garbage: 219 with self.postDeleteStatements() as garbage:
220 for stmt in p.delQuads: 220 for stmt in p.delQuads:
221 sourceUrls, handlers = self.table[stmt] 221 sourceUrls, handlers = self.table[stmt]
222 if source not in sourceUrls: 222 if source not in sourceUrls:
223 raise ValueError("%s deleting stmt that it didn't have: %s" % 223 raise ValueError("%s deleting stmt that it didn't have: %s" % (source, abbrevStmt(stmt)))
224 (source, abbrevStmt(stmt)))
225 sourceUrls.remove(source) 224 sourceUrls.remove(source)
226 # this is rare, since some handler probably still has 225 # this is rare, since some handler probably still has
227 # the stmt we're deleting, but it can happen e.g. when 226 # the stmt we're deleting, but it can happen e.g. when
228 # a handler was just deleted 227 # a handler was just deleted
229 if not sourceUrls and not handlers: 228 if not sourceUrls and not handlers:
230 garbage.add(stmt) 229 garbage.add(stmt)
231 230
232 @STATS.replaceSourceStatements.time() 231 @REPLACE_SOURCE_STATEMENTS_CALLS.time()
233 def replaceSourceStatements(self, source: SourceUri, 232 def replaceSourceStatements(self, source: SourceUri, stmts: Sequence[Statement]):
234 stmts: Sequence[StatementType]):
235 log.debug('replaceSourceStatements with %s stmts', len(stmts)) 233 log.debug('replaceSourceStatements with %s stmts', len(stmts))
236 newStmts = set(stmts) 234 newStmts = set(stmts)
237 235
238 with self.postDeleteStatements() as garbage: 236 with self.postDeleteStatements() as garbage:
239 for stmt, (sources, handlers) in self.table.items(): 237 for stmt, (sources, handlers) in self.table.items():
262 sources.discard(source) 260 sources.discard(source)
263 if not sources and not handlers: 261 if not sources and not handlers:
264 garbage.add(stmt) 262 garbage.add(stmt)
265 263
266 264
267
268 class GraphClients(object): 265 class GraphClients(object):
269 """ 266 """
270 All the active PatchSources and SSEHandlers 267 All the active PatchSources and SSEHandlers
271 268
272 To handle all the overlapping-statement cases, we store a set of 269 To handle all the overlapping-statement cases, we store a set of
273 true statements along with the sources that are currently 270 true statements along with the sources that are currently
274 asserting them and the requesters who currently know them. As 271 asserting them and the requesters who currently know them. As
275 statements come and go, we make patches to send to requesters. 272 statements come and go, we make patches to send to requesters.
276 """ 273 """
274
277 def __init__(self): 275 def __init__(self):
278 self.clients: Dict[SourceUri, PatchSource] = {} # (COLLECTOR is not listed) 276 self.clients: Dict[SourceUri, PatchSource] = {} # (COLLECTOR is not listed)
279 self.handlers: Set[PatchSink] = set() 277 self.handlers: Set[PatchSink] = set()
280 self.statements: ActiveStatements = ActiveStatements() 278 self.statements: ActiveStatements = ActiveStatements()
281 279
282 self._localStatements = LocalStatements(self._onPatch) 280 self._localStatements = LocalStatements(self._onPatch)
283 281
284 def state(self) -> Dict: 282 def state(self) -> Dict:
285 return { 283 return {
286 'clients': sorted([ps.state() for ps in self.clients.values()], 284 'clients': sorted([ps.state() for ps in self.clients.values()], key=lambda r: r['reconnectedPatchSource']['url']),
287 key=lambda r: r['reconnectedPatchSource']['url']), 285 'sseHandlers': sorted([h.state() for h in self.handlers], key=lambda r: (r['streamId'], r['created'])),
288 'sseHandlers': sorted([h.state() for h in self.handlers],
289 key=lambda r: (r['streamId'], r['created'])),
290 'statements': self.statements.state(), 286 'statements': self.statements.state(),
291 } 287 }
292 288
293 def _sourcesForHandler(self, handler: PatchSink) -> List[SourceUri]: 289 def _sourcesForHandler(self, handler: PatchSink) -> List[SourceUri]:
294 streamId = handler.streamId 290 streamId = handler.streamId
295 matches = [s for s in config['streams'] if s['id'] == streamId] 291 matches = [s for s in config['streams'] if s['id'] == streamId]
296 if len(matches) != 1: 292 if len(matches) != 1:
297 raise ValueError("%s matches for %r" % (len(matches), streamId)) 293 raise ValueError("%s matches for %r" % (len(matches), streamId))
298 return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [ 294 return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [COLLECTOR]
299 COLLECTOR] 295
300 296 @ON_PATCH_CALLS.time()
301 @STATS.onPatch.time() 297 def _onPatch(self, source: SourceUri, p: Patch, fullGraph: bool = False):
302 def _onPatch(self, source: SourceUri, p: Patch, fullGraph: bool=False):
303 if fullGraph: 298 if fullGraph:
304 # a reconnect may need to resend the full graph even 299 # a reconnect may need to resend the full graph even
305 # though we've already sent some statements 300 # though we've already sent some statements
306 self.statements.replaceSourceStatements(source, p.addQuads) 301 self.statements.replaceSourceStatements(source, p.addQuads)
307 else: 302 else:
311 306
312 if log.isEnabledFor(logging.DEBUG): 307 if log.isEnabledFor(logging.DEBUG):
313 self.statements.pprintTable() 308 self.statements.pprintTable()
314 309
315 if source != COLLECTOR: 310 if source != COLLECTOR:
316 self._localStatements.setSourceState( 311 self._localStatements.setSourceState(source, ROOM['fullGraphReceived'] if fullGraph else ROOM['patchesReceived'])
317 source, 312
318 ROOM['fullGraphReceived'] if fullGraph else 313 @SEND_UPDATE_PATCH_CALLS.time()
319 ROOM['patchesReceived']) 314 def _sendUpdatePatch(self, handler: Optional[PatchSink] = None):
320
321 @STATS.sendUpdatePatch.time()
322 def _sendUpdatePatch(self, handler: Optional[PatchSink]=None):
323 """ 315 """
324 send a patch event out this handler to bring it up to date with 316 send a patch event out this handler to bring it up to date with
325 self.statements 317 self.statements
326 """ 318 """
327 now = time.time() 319 now = time.time()
345 # This can be a giant line, which was a problem 337 # This can be a giant line, which was a problem
346 # once. Might be nice for this service to try to break 338 # once. Might be nice for this service to try to break
347 # it up into multiple sends, although there's no 339 # it up into multiple sends, although there's no
348 # guarantee at all since any single stmt could be any 340 # guarantee at all since any single stmt could be any
349 # length. 341 # length.
350 h.sendEvent(message=jsonFromPatch(p).encode('utf8'), 342 h.sendEvent(message=jsonFromPatch(p).encode('utf8'), event=b'patch')
351 event=b'patch')
352 h.lastPatchSentTime = now 343 h.lastPatchSentTime = now
353 else: 344 else:
354 log.debug('nothing to send to %s', h) 345 log.debug('nothing to send to %s', h)
355 346
356 def addSseHandler(self, handler: PatchSink): 347 def addSseHandler(self, handler: PatchSink):
363 354
364 for source in sources: 355 for source in sources:
365 if source not in self.clients and source != COLLECTOR: 356 if source not in self.clients and source != COLLECTOR:
366 log.debug('connect to patch source %s', source) 357 log.debug('connect to patch source %s', source)
367 self._localStatements.setSourceState(source, ROOM['connect']) 358 self._localStatements.setSourceState(source, ROOM['connect'])
368 self.clients[source] = ReconnectingPatchSource( 359 self.clients[source] = ReconnectingPatchSource(source,
369 source, 360 listener=lambda p, fullGraph, source=source: self._onPatch(source, p, fullGraph),
370 listener=lambda p, fullGraph, source=source: self._onPatch( 361 reconnectSecs=10)
371 source, p, fullGraph),
372 reconnectSecs=10)
373 log.debug('bring new client up to date') 362 log.debug('bring new client up to date')
374 363
375 self._sendUpdatePatch(handler) 364 self._sendUpdatePatch(handler)
376 365
377 def removeSseHandler(self, handler: PatchSink): 366 def removeSseHandler(self, handler: PatchSink):
378 log.info('removeSseHandler %r', handler) 367 log.info('removeSseHandler %r', handler)
379 self.statements.discardHandler(handler) 368 self.statements.discardHandler(handler)
380 for source in self._sourcesForHandler(handler): 369 for source in self._sourcesForHandler(handler):
381 for otherHandler in self.handlers: 370 for otherHandler in self.handlers:
382 if (otherHandler != handler and 371 if (otherHandler != handler and source in self._sourcesForHandler(otherHandler)):
383 source in self._sourcesForHandler(otherHandler)):
384 # still in use 372 # still in use
385 break 373 break
386 else: 374 else:
387 self._stopClient(source) 375 self._stopClient(source)
388 376
412 if not sources and not any(h in self.handlers for h in handlers): 400 if not sources and not any(h in self.handlers for h in handlers):
413 garbage.add(stmt) 401 garbage.add(stmt)
414 402
415 403
416 class State(cyclone.web.RequestHandler): 404 class State(cyclone.web.RequestHandler):
417 @STATS.getState.time() 405
406 @GET_STATE_CALLS.time()
418 def get(self) -> None: 407 def get(self) -> None:
419 try: 408 try:
420 state = self.settings.graphClients.state() 409 state = self.settings.graphClients.state()
421 self.write(json.dumps({'graphClients': state}, indent=2, 410 self.write(json.dumps({'graphClients': state}, indent=2, default=lambda obj: '<unserializable>'))
422 default=lambda obj: '<unserializable>'))
423 except Exception: 411 except Exception:
424 import traceback; traceback.print_exc() 412 import traceback
413 traceback.print_exc()
425 raise 414 raise
426 415
416
427 class GraphList(cyclone.web.RequestHandler): 417 class GraphList(cyclone.web.RequestHandler):
418
428 def get(self) -> None: 419 def get(self) -> None:
429 self.write(json.dumps(config['streams'])) 420 self.write(json.dumps(config['streams']))
421
430 422
431 if __name__ == '__main__': 423 if __name__ == '__main__':
432 arg = docopt(""" 424 arg = docopt("""
433 Usage: sse_collector.py [options] 425 Usage: sse_collector.py [options]
434 426
439 if arg['-v'] or arg['-i']: 431 if arg['-v'] or arg['-i']:
440 enableTwistedLog() 432 enableTwistedLog()
441 log.setLevel(logging.DEBUG if arg['-v'] else logging.INFO) 433 log.setLevel(logging.DEBUG if arg['-v'] else logging.INFO)
442 defer.setDebugging(True) 434 defer.setDebugging(True)
443 435
444
445 graphClients = GraphClients() 436 graphClients = GraphClients()
446 #exporter = InfluxExporter(... to export some stats values 437
447 438 reactor.listenTCP(9072,
448 reactor.listenTCP( 439 cyclone.web.Application(handlers=[
449 9072, 440 (r"/()", cyclone.web.StaticFileHandler, {
450 cyclone.web.Application( 441 "path": ".",
451 handlers=[ 442 "default_filename": "index.html"
452 (r"/()", cyclone.web.StaticFileHandler, { 443 }),
453 "path": ".", "default_filename": "index.html"}), 444 (r'/state', State),
454 (r'/state', State), 445 (r'/graph/', GraphList),
455 (r'/graph/', GraphList), 446 (r'/graph/(.+)', PatchSink),
456 (r'/graph/(.+)', PatchSink), 447 (r'/metrics', Metrics),
457 (r'/stats/(.*)', StatsHandler, {'serverName': 'collector'}), 448 ],
458 ], 449 graphClients=graphClients),
459 graphClients=graphClients), 450 interface='::')
460 interface='::')
461 reactor.run() 451 reactor.run()