Mercurial > code > home > repos > homeauto
annotate service/collector/sse_collector.py @ 451:17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
Ignore-this: df20acbf7ec27226f3060f3b5a4c710b
author | drewp@bigasterisk.com |
---|---|
date | Fri, 19 Apr 2019 01:08:01 -0700 |
parents | ef7eba0551f2 |
children | 91ab9f926aa1 |
rev | line source |
---|---|
296 | 1 """ |
2 requesting /graph/foo returns an SSE patch stream that's the | |
3 result of fetching multiple other SSE patch streams. The result stream | |
4 may include new statements injected by this service. | |
5 | |
6 Future: | |
7 - filter out unneeded stmts from the sources | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
8 - give a time resolution and concatenate any patches that come faster than that res |
296 | 9 """ |
10 from docopt import docopt | |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
11 from greplin import scales |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
12 from greplin.scales.cyclonehandler import StatsHandler |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
13 from rdflib import Namespace, URIRef, StatementType |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
14 from rdflib.term import Node |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
15 from twisted.internet import reactor, defer |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
16 from typing import Callable, Dict, NewType, Tuple, Union, Any, Sequence, Set, List, Optional |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
17 import cyclone.web, cyclone.sse |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
18 import logging, collections, json, time |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
19 |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
20 from logsetup import log, enableTwistedLog |
302 | 21 from patchablegraph import jsonFromPatch |
351
7716b1810d6c
reasoning & collector move into docker images
drewp@bigasterisk.com
parents:
316
diff
changeset
|
22 from rdfdb.patch import Patch |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
23 |
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
24 # workaround for broken import in twisted_sse_demo/eventsourcee.py |
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
25 import sys; sys.path.append('twisted_sse_demo') |
302 | 26 from patchsource import ReconnectingPatchSource |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
27 |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
28 from sse_collector_config import config |
302 | 29 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
30 #SourceUri = NewType('SourceUri', URIRef) # doesn't work |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
31 class SourceUri(URIRef): pass |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
32 |
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
33 |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
34 ROOM = Namespace("http://projects.bigasterisk.com/room/") |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
35 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
36 |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
37 STATS = scales.collection('/root', |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
38 scales.PmfStat('getState'), |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
39 scales.PmfStat('localStatementsPatch'), |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
40 scales.PmfStat('makeSyncPatch'), |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
41 scales.PmfStat('onPatch'), |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
42 scales.PmfStat('sendUpdatePatch'), |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
43 scales.PmfStat('replaceSourceStatements'), |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
44 ) |
351
7716b1810d6c
reasoning & collector move into docker images
drewp@bigasterisk.com
parents:
316
diff
changeset
|
45 |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
46 class LocalStatements(object): |
301 | 47 """ |
48 functions that make statements originating from sse_collector itself | |
49 """ | |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
50 def __init__(self, applyPatch: Callable[[URIRef, Patch], None]): |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
51 self.applyPatch = applyPatch |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
52 self._sourceState: Dict[SourceUri, URIRef] = {} # source: state URIRef |
306 | 53 |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
54 @STATS.localStatementsPatch.time() |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
55 def setSourceState(self, source: SourceUri, state: URIRef): |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
56 """ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
57 add a patch to the COLLECTOR graph about the state of this |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
58 source. state=None to remove the source. |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
59 """ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
60 oldState = self._sourceState.get(source, None) |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
61 if state == oldState: |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
62 return |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
63 log.info('source state %s -> %s', source, state) |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
64 if oldState is None: |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
65 self._sourceState[source] = state |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
66 self.applyPatch(COLLECTOR, Patch(addQuads=[ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
67 (COLLECTOR, ROOM['source'], source, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
68 (source, ROOM['state'], state, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
69 ])) |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
70 elif state is None: |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
71 del self._sourceState[source] |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
72 self.applyPatch(COLLECTOR, Patch(delQuads=[ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
73 (COLLECTOR, ROOM['source'], source, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
74 (source, ROOM['state'], oldState, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
75 ])) |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
76 else: |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
77 self._sourceState[source] = state |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
78 self.applyPatch(COLLECTOR, Patch( |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
79 addQuads=[ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
80 (source, ROOM['state'], state, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
81 ], |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
82 delQuads=[ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
83 (source, ROOM['state'], oldState, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
84 ])) |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
85 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
86 def abbrevTerm(t: Union[URIRef, Node]) -> Union[str, Node]: |
301 | 87 if isinstance(t, URIRef): |
88 return (t.replace('http://projects.bigasterisk.com/room/', 'room:') | |
446
346b85a9adbb
rollback the unicode(source) optimization. it was breaking all output to patch consumers
drewp@bigasterisk.com
parents:
444
diff
changeset
|
89 .replace('http://projects.bigasterisk.com/device/', 'dev:') |
301 | 90 .replace('http://bigasterisk.com/sse_collector/', 'sc:')) |
91 return t | |
92 | |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
93 def abbrevStmt(stmt: StatementType) -> str: |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
94 t = tuple(map(abbrevTerm, stmt)) |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
95 return '(%s %s %s %s)' % (t[0], t[1], t[2], t[3]) |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
96 |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
97 StatementTable = Dict[StatementType, Tuple[Set[SourceUri], Set[SomeGraph]]] |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
98 |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
99 |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
100 class PostDeleter(object): |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
101 def __init__(self, statements: StatementTable): |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
102 self.statements = statements |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
103 |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
104 def __enter__(self): |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
105 self._garbage: List[StatementType] = [] |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
106 return self |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
107 |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
108 def add(self, stmt: StatementType): |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
109 self._garbage.append(stmt) |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
110 |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
111 def __exit__(self, type, value, traceback): |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
112 if type is not None: |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
113 raise |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
114 for stmt in self._garbage: |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
115 del self.statements[stmt] |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
116 |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
117 |
301 | 118 class ActiveStatements(object): |
296 | 119 def __init__(self): |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
120 # This table holds statements asserted by any of our sources |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
121 # plus local statements that we introduce (source is |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
122 # http://bigasterisk.com/sse_collector/). |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
123 self.statements: StatementTable = collections.defaultdict(lambda: (set(), set())) |
306 | 124 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
125 def state(self) -> Dict: |
306 | 126 return { |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
127 'len': len(self.statements), |
306 | 128 } |
129 | |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
130 def _postDeleteStatements(self) -> PostDeleter: |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
131 return PostDeleter(self.statements) |
301 | 132 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
133 def pprintTable(self) -> None: |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
134 for i, (stmt, (sources, handlers)) in enumerate(sorted(self.statements.items())): |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
135 print("%03d. %-80s from %s to %s" % ( |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
136 i, |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
137 abbrevStmt(stmt), |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
138 [abbrevTerm(s) for s in sources], |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
139 handlers)) |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
140 |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
141 @STATS.makeSyncPatch.time() |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
142 def makeSyncPatch(self, handler: SomeGraph, sources: Set[SourceUri]): |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
143 # todo: this could run all handlers at once, which is how we use it anyway |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
144 adds = [] |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
145 dels = [] |
444
96d712dccf28
WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents:
443
diff
changeset
|
146 |
301 | 147 with self._postDeleteStatements() as garbage: |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
148 for stmt, (stmtSources, handlers) in self.statements.items(): |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
149 belongsInHandler = not sources.isdisjoint(stmtSources) |
301 | 150 handlerHasIt = handler in handlers |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
151 log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt) |
301 | 152 if belongsInHandler and not handlerHasIt: |
153 adds.append(stmt) | |
154 handlers.add(handler) | |
155 elif not belongsInHandler and handlerHasIt: | |
156 dels.append(stmt) | |
157 handlers.remove(handler) | |
158 if not handlers and not stmtSources: | |
159 garbage.add(stmt) | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
160 |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
161 return Patch(addQuads=adds, delQuads=dels) |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
162 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
163 def applySourcePatch(self, source: SourceUri, p: Patch): |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
164 for stmt in p.addQuads: |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
165 sourceUrls, handlers = self.statements[stmt] |
446
346b85a9adbb
rollback the unicode(source) optimization. it was breaking all output to patch consumers
drewp@bigasterisk.com
parents:
444
diff
changeset
|
166 if source in sourceUrls: |
301 | 167 raise ValueError("%s added stmt that it already had: %s" % |
168 (source, abbrevStmt(stmt))) | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
169 sourceUrls.add(source) |
301 | 170 |
171 with self._postDeleteStatements() as garbage: | |
172 for stmt in p.delQuads: | |
173 sourceUrls, handlers = self.statements[stmt] | |
446
346b85a9adbb
rollback the unicode(source) optimization. it was breaking all output to patch consumers
drewp@bigasterisk.com
parents:
444
diff
changeset
|
174 if source not in sourceUrls: |
301 | 175 raise ValueError("%s deleting stmt that it didn't have: %s" % |
176 (source, abbrevStmt(stmt))) | |
177 sourceUrls.remove(source) | |
178 # this is rare, since some handler probably still has | |
179 # the stmt we're deleting, but it can happen e.g. when | |
180 # a handler was just deleted | |
181 if not sourceUrls and not handlers: | |
182 garbage.add(stmt) | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
183 |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
184 @STATS.replaceSourceStatements.time() |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
185 def replaceSourceStatements(self, source: SourceUri, stmts: Sequence[StatementType]): |
301 | 186 log.debug('replaceSourceStatements with %s stmts', len(stmts)) |
187 newStmts = set(stmts) | |
188 | |
189 with self._postDeleteStatements() as garbage: | |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
190 for stmt, (sources, handlers) in self.statements.items(): |
301 | 191 if source in sources: |
192 if stmt not in stmts: | |
193 sources.remove(source) | |
194 if not sources and not handlers: | |
195 garbage.add(stmt) | |
196 else: | |
197 if stmt in stmts: | |
198 sources.add(source) | |
199 newStmts.discard(stmt) | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
200 |
301 | 201 self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[])) |
202 | |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
203 def discardHandler(self, handler: SomeGraph): |
301 | 204 with self._postDeleteStatements() as garbage: |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
205 for stmt, (sources, handlers) in self.statements.items(): |
301 | 206 handlers.discard(handler) |
207 if not sources and not handlers: | |
208 garbage.add(stmt) | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
209 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
210 def discardSource(self, source: SourceUri): |
301 | 211 with self._postDeleteStatements() as garbage: |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
212 for stmt, (sources, handlers) in self.statements.items(): |
301 | 213 sources.discard(source) |
214 if not sources and not handlers: | |
215 garbage.add(stmt) | |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
216 |
301 | 217 |
218 class GraphClients(object): | |
219 """ | |
220 All the active PatchSources and SSEHandlers | |
221 | |
222 To handle all the overlapping-statement cases, we store a set of | |
223 true statements along with the sources that are currently | |
224 asserting them and the requesters who currently know them. As | |
225 statements come and go, we make patches to send to requesters. | |
226 """ | |
227 def __init__(self): | |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
228 self.clients: Dict[SourceUri, PatchSource] = {} # (COLLECTOR is not listed) |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
229 self.handlers: Set[SomeGraph] = set() |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
230 self.statements: ActiveStatements = ActiveStatements() |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
231 |
301 | 232 self._localStatements = LocalStatements(self._onPatch) |
233 | |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
234 def state(self) -> Dict: |
306 | 235 return { |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
236 'clients': [ps.state() for ps in self.clients.values()], |
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
237 'sseHandlers': [h.state() for h in self.handlers], |
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
238 'statements': self.statements.state(), |
306 | 239 } |
240 | |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
241 def _sourcesForHandler(self, handler: SomeGraph) -> List[SourceUri]: |
301 | 242 streamId = handler.streamId |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
243 matches = [s for s in config['streams'] if s['id'] == streamId] |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
244 if len(matches) != 1: |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
245 raise ValueError("%s matches for %r" % (len(matches), streamId)) |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
246 return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [COLLECTOR] |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
247 |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
248 @STATS.onPatch.time() |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
249 def _onPatch(self, source: SourceUri, p: Patch, fullGraph: bool=False): |
301 | 250 if fullGraph: |
251 # a reconnect may need to resend the full graph even | |
252 # though we've already sent some statements | |
253 self.statements.replaceSourceStatements(source, p.addQuads) | |
254 else: | |
255 self.statements.applySourcePatch(source, p) | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
256 |
301 | 257 self._sendUpdatePatch() |
258 | |
259 if log.isEnabledFor(logging.DEBUG): | |
260 self.statements.pprintTable() | |
261 | |
262 if source != COLLECTOR: | |
263 self._localStatements.setSourceState( | |
264 source, | |
265 ROOM['fullGraphReceived'] if fullGraph else | |
266 ROOM['patchesReceived']) | |
267 | |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
268 @STATS.sendUpdatePatch.time() |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
269 def _sendUpdatePatch(self, handler: Optional[SomeGraph]=None): |
301 | 270 """ |
271 send a patch event out this handler to bring it up to date with | |
272 self.statements | |
273 """ | |
444
96d712dccf28
WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents:
443
diff
changeset
|
274 now = time.time() |
301 | 275 # reduce loops here- prepare all patches at once |
276 for h in (self.handlers if handler is None else [handler]): | |
444
96d712dccf28
WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents:
443
diff
changeset
|
277 period = 1 |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
278 if 'Raspbian' in h.request.headers.get('user-agent', ''): |
444
96d712dccf28
WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents:
443
diff
changeset
|
279 period = 5 |
96d712dccf28
WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents:
443
diff
changeset
|
280 if h.lastPatchSentTime > now - period: |
96d712dccf28
WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents:
443
diff
changeset
|
281 continue |
301 | 282 p = self.statements.makeSyncPatch(h, self._sourcesForHandler(h)) |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
283 log.debug('makeSyncPatch for %r: %r', h, p.jsonRepr) |
301 | 284 if not p.isNoop(): |
285 log.debug("send patch %s to %s", p.shortSummary(), h) | |
316
02d9915b3bbb
patchsource accept much longer lines from sse_collector
drewp@bigasterisk.com
parents:
313
diff
changeset
|
286 # This can be a giant line, which was a problem once. Might be |
02d9915b3bbb
patchsource accept much longer lines from sse_collector
drewp@bigasterisk.com
parents:
313
diff
changeset
|
287 # nice for this service to try to break it up into multiple sends, |
02d9915b3bbb
patchsource accept much longer lines from sse_collector
drewp@bigasterisk.com
parents:
313
diff
changeset
|
288 # although there's no guarantee at all since any single stmt |
02d9915b3bbb
patchsource accept much longer lines from sse_collector
drewp@bigasterisk.com
parents:
313
diff
changeset
|
289 # could be any length. |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
290 h.sendEvent(message=jsonFromPatch(p).encode('utf8'), event=b'patch') |
444
96d712dccf28
WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents:
443
diff
changeset
|
291 h.lastPatchSentTime = now |
96d712dccf28
WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents:
443
diff
changeset
|
292 else: |
96d712dccf28
WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents:
443
diff
changeset
|
293 log.debug('nothing to send to %s', h) |
306 | 294 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
295 def addSseHandler(self, handler: SomeGraph): |
301 | 296 log.info('addSseHandler %r %r', handler, handler.streamId) |
313 | 297 |
298 # fail early if id doesn't match | |
299 sources = self._sourcesForHandler(handler) | |
300 | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
301 self.handlers.add(handler) |
301 | 302 |
313 | 303 for source in sources: |
301 | 304 if source not in self.clients and source != COLLECTOR: |
442
ee74dc3b58fb
collector build improvements; stats and logging
drewp@bigasterisk.com
parents:
439
diff
changeset
|
305 log.debug('connect to patch source %s', source) |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
306 self._localStatements.setSourceState(source, ROOM['connect']) |
302 | 307 self.clients[source] = ReconnectingPatchSource( |
444
96d712dccf28
WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents:
443
diff
changeset
|
308 source, |
96d712dccf28
WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents:
443
diff
changeset
|
309 listener=lambda p, fullGraph, source=source: self._onPatch( |
96d712dccf28
WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents:
443
diff
changeset
|
310 source, p, fullGraph), |
96d712dccf28
WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents:
443
diff
changeset
|
311 reconnectSecs=10) |
96d712dccf28
WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents:
443
diff
changeset
|
312 log.debug('bring new client up to date') |
96d712dccf28
WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents:
443
diff
changeset
|
313 handler.sendEvent(message='hello', event='greets') |
96d712dccf28
WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents:
443
diff
changeset
|
314 |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
315 self._sendUpdatePatch(handler) |
296 | 316 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
317 def removeSseHandler(self, handler: SomeGraph): |
296 | 318 log.info('removeSseHandler %r', handler) |
301 | 319 self.statements.discardHandler(handler) |
320 for source in self._sourcesForHandler(handler): | |
321 for otherHandler in self.handlers: | |
322 if (otherHandler != handler and | |
323 source in self._sourcesForHandler(otherHandler)): | |
324 break | |
325 else: | |
326 self._stopClient(source) | |
327 | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
328 self.handlers.remove(handler) |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
329 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
330 def _stopClient(self, url: SourceUri): |
301 | 331 if url == COLLECTOR: |
332 return | |
333 | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
334 self.clients[url].stop() |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
335 |
301 | 336 self.statements.discardSource(url) |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
337 |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
338 self._localStatements.setSourceState(url, None) |
444
96d712dccf28
WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents:
443
diff
changeset
|
339 if url in self.clients: |
96d712dccf28
WIP collector; not sure why it never sends out any patches
drewp@bigasterisk.com
parents:
443
diff
changeset
|
340 del self.clients[url] |
301 | 341 |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
342 |
296 | 343 class SomeGraph(cyclone.sse.SSEHandler): |
301 | 344 _handlerSerial = 0 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
345 def __init__(self, application: cyclone.web.Application, request): |
296 | 346 cyclone.sse.SSEHandler.__init__(self, application, request) |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
347 self.bound = False |
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
348 self.created = time.time() |
296 | 349 self.graphClients = self.settings.graphClients |
350 | |
301 | 351 self._serial = SomeGraph._handlerSerial |
352 SomeGraph._handlerSerial += 1 | |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
353 self.lastPatchSentTime: float = 0.0 |
301 | 354 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
355 def __repr__(self) -> str: |
301 | 356 return '<Handler #%s>' % self._serial |
306 | 357 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
358 def state(self) -> Dict: |
306 | 359 return { |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
360 'created': round(self.created, 2), |
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
361 'ageHours': round((time.time() - self.created) / 3600, 2), |
306 | 362 'streamId': self.streamId, |
363 'remoteIp': self.request.remote_ip, | |
364 'userAgent': self.request.headers.get('user-agent'), | |
365 } | |
301 | 366 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
367 def bind(self, *args, **kwargs): |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
368 self.streamId = args[0] |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
369 |
301 | 370 self.graphClients.addSseHandler(self) |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
371 # If something goes wrong with addSseHandler, I don't want to |
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
372 # try removeSseHandler. |
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
373 self.bound = True |
296 | 374 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
375 def unbind(self) -> None: |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
376 if self.bound: |
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
377 self.graphClients.removeSseHandler(self) |
296 | 378 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
379 |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
380 class State(cyclone.web.RequestHandler): |
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
381 @STATS.getState.time() |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
382 def get(self) -> None: |
306 | 383 try: |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
384 state = self.settings.graphClients.state() |
306 | 385 except: |
386 import traceback; traceback.print_exc() | |
387 raise | |
388 | |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
389 self.write(json.dumps({'graphClients': state}, indent=2)) |
313 | 390 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
391 |
313 | 392 class Root(cyclone.web.RequestHandler): |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
393 def get(self) -> None: |
313 | 394 self.write('<html><body>sse_collector</body></html>') |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
395 |
306 | 396 |
296 | 397 if __name__ == '__main__': |
398 arg = docopt(""" | |
399 Usage: sse_collector.py [options] | |
400 | |
401 -v Verbose | |
402 """) | |
403 | |
404 if arg['-v']: | |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
405 enableTwistedLog() |
442
ee74dc3b58fb
collector build improvements; stats and logging
drewp@bigasterisk.com
parents:
439
diff
changeset
|
406 log.setLevel(logging.DEBUG) |
ee74dc3b58fb
collector build improvements; stats and logging
drewp@bigasterisk.com
parents:
439
diff
changeset
|
407 defer.setDebugging(True) |
296 | 408 |
409 | |
410 graphClients = GraphClients() | |
442
ee74dc3b58fb
collector build improvements; stats and logging
drewp@bigasterisk.com
parents:
439
diff
changeset
|
411 #exporter = InfluxExporter(... to export some stats values |
ee74dc3b58fb
collector build improvements; stats and logging
drewp@bigasterisk.com
parents:
439
diff
changeset
|
412 |
296 | 413 reactor.listenTCP( |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
414 9072, |
296 | 415 cyclone.web.Application( |
416 handlers=[ | |
313 | 417 (r'/', Root), |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
418 (r'/state', State), |
296 | 419 (r'/graph/(.*)', SomeGraph), |
442
ee74dc3b58fb
collector build improvements; stats and logging
drewp@bigasterisk.com
parents:
439
diff
changeset
|
420 (r'/stats/(.*)', StatsHandler, {'serverName': 'collector'}), |
296 | 421 ], |
422 graphClients=graphClients), | |
423 interface='::') | |
424 reactor.run() |