Mercurial > code > home > repos > homeauto
annotate service/collector/sse_collector.py @ 595:7fd9fa5d33aa
standardize build. fix /state report
Ignore-this: 5a9bc82de9f0d7398c9290fc2c7ecbf9
author | drewp@bigasterisk.com |
---|---|
date | Sat, 06 Jul 2019 13:56:07 -0700 |
parents | 91ab9f926aa1 |
children | 22751570eda1 |
rev | line source |
---|---|
296 | 1 """ |
2 requesting /graph/foo returns an SSE patch stream that's the | |
3 result of fetching multiple other SSE patch streams. The result stream | |
4 may include new statements injected by this service. | |
5 | |
6 Future: | |
7 - filter out unneeded stmts from the sources | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
8 - give a time resolution and concatenate any patches that come faster than that res |
296 | 9 """ |
10 from docopt import docopt | |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
11 from greplin import scales |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
12 from greplin.scales.cyclonehandler import StatsHandler |
470 | 13 from rdflib import Namespace, URIRef |
14 | |
15 from typing import TYPE_CHECKING | |
16 if TYPE_CHECKING: | |
17 from rdflib import StatementType | |
18 else: | |
19 class StatementType: pass # type: ignore | |
20 | |
21 | |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
22 from rdflib.term import Node |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
23 from twisted.internet import reactor, defer |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
24 from typing import Callable, Dict, NewType, Tuple, Union, Any, Sequence, Set, List, Optional |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
25 import cyclone.web, cyclone.sse |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
26 import logging, collections, json, time |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
27 |
595 | 28 from standardservice.logsetup import log, enableTwistedLog |
302 | 29 from patchablegraph import jsonFromPatch |
351
7716b1810d6c
reasoning & collector move into docker images
drewp@bigasterisk.com
parents:
316
diff
changeset
|
30 from rdfdb.patch import Patch |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
31 |
595 | 32 from patchablegraph.patchsource import ReconnectingPatchSource |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
33 |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
34 from sse_collector_config import config |
302 | 35 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
36 #SourceUri = NewType('SourceUri', URIRef) # doesn't work |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
37 class SourceUri(URIRef): pass |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
38 |
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
39 |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
40 ROOM = Namespace("http://projects.bigasterisk.com/room/") |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
41 COLLECTOR = SourceUri(URIRef('http://bigasterisk.com/sse_collector/')) |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
42 |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
43 STATS = scales.collection('/root', |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
44 scales.PmfStat('getState'), |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
45 scales.PmfStat('localStatementsPatch'), |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
46 scales.PmfStat('makeSyncPatch'), |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
47 scales.PmfStat('onPatch'), |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
48 scales.PmfStat('sendUpdatePatch'), |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
49 scales.PmfStat('replaceSourceStatements'), |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
50 ) |
351
7716b1810d6c
reasoning & collector move into docker images
drewp@bigasterisk.com
parents:
316
diff
changeset
|
51 |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
52 class LocalStatements(object): |
301 | 53 """ |
54 functions that make statements originating from sse_collector itself | |
55 """ | |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
56 def __init__(self, applyPatch: Callable[[URIRef, Patch], None]): |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
57 self.applyPatch = applyPatch |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
58 self._sourceState: Dict[SourceUri, URIRef] = {} # source: state URIRef |
306 | 59 |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
60 @STATS.localStatementsPatch.time() |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
61 def setSourceState(self, source: SourceUri, state: URIRef): |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
62 """ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
63 add a patch to the COLLECTOR graph about the state of this |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
64 source. state=None to remove the source. |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
65 """ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
66 oldState = self._sourceState.get(source, None) |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
67 if state == oldState: |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
68 return |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
69 log.info('source state %s -> %s', source, state) |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
70 if oldState is None: |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
71 self._sourceState[source] = state |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
72 self.applyPatch(COLLECTOR, Patch(addQuads=[ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
73 (COLLECTOR, ROOM['source'], source, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
74 (source, ROOM['state'], state, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
75 ])) |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
76 elif state is None: |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
77 del self._sourceState[source] |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
78 self.applyPatch(COLLECTOR, Patch(delQuads=[ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
79 (COLLECTOR, ROOM['source'], source, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
80 (source, ROOM['state'], oldState, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
81 ])) |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
82 else: |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
83 self._sourceState[source] = state |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
84 self.applyPatch(COLLECTOR, Patch( |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
85 addQuads=[ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
86 (source, ROOM['state'], state, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
87 ], |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
88 delQuads=[ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
89 (source, ROOM['state'], oldState, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
90 ])) |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
91 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
92 def abbrevTerm(t: Union[URIRef, Node]) -> Union[str, Node]: |
301 | 93 if isinstance(t, URIRef): |
94 return (t.replace('http://projects.bigasterisk.com/room/', 'room:') | |
446
346b85a9adbb
rollback the unicode(source) optimization. it was breaking all output to patch consumers
drewp@bigasterisk.com
parents:
444
diff
changeset
|
95 .replace('http://projects.bigasterisk.com/device/', 'dev:') |
301 | 96 .replace('http://bigasterisk.com/sse_collector/', 'sc:')) |
97 return t | |
98 | |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
99 def abbrevStmt(stmt: StatementType) -> str: |
470 | 100 return '(%s %s %s %s)' % (abbrevTerm(stmt[0]), abbrevTerm(stmt[1]), |
101 abbrevTerm(stmt[2]), abbrevTerm(stmt[3])) | |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
102 |
470 | 103 class PatchSink(cyclone.sse.SSEHandler): |
301 | 104 _handlerSerial = 0 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
105 def __init__(self, application: cyclone.web.Application, request): |
296 | 106 cyclone.sse.SSEHandler.__init__(self, application, request) |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
107 self.bound = False |
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
108 self.created = time.time() |
296 | 109 self.graphClients = self.settings.graphClients |
110 | |
470 | 111 self._serial = PatchSink._handlerSerial |
112 PatchSink._handlerSerial += 1 | |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
113 self.lastPatchSentTime: float = 0.0 |
301 | 114 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
115 def __repr__(self) -> str: |
301 | 116 return '<Handler #%s>' % self._serial |
306 | 117 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
118 def state(self) -> Dict: |
306 | 119 return { |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
120 'created': round(self.created, 2), |
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
121 'ageHours': round((time.time() - self.created) / 3600, 2), |
306 | 122 'streamId': self.streamId, |
123 'remoteIp': self.request.remote_ip, | |
124 'userAgent': self.request.headers.get('user-agent'), | |
125 } | |
301 | 126 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
127 def bind(self, *args, **kwargs): |
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
128 self.streamId = args[0] |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
129 |
301 | 130 self.graphClients.addSseHandler(self) |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
131 # If something goes wrong with addSseHandler, I don't want to |
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
132 # try removeSseHandler. |
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
133 self.bound = True |
296 | 134 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
135 def unbind(self) -> None: |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
136 if self.bound: |
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
137 self.graphClients.removeSseHandler(self) |
296 | 138 |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
139 |
470 | 140 StatementTable = Dict[StatementType, Tuple[Set[SourceUri], Set[PatchSink]]] |
141 | |
142 | |
143 class PostDeleter(object): | |
144 def __init__(self, statements: StatementTable): | |
145 self.statements = statements | |
146 | |
147 def __enter__(self): | |
148 self._garbage: List[StatementType] = [] | |
149 return self | |
150 | |
151 def add(self, stmt: StatementType): | |
152 self._garbage.append(stmt) | |
153 | |
154 def __exit__(self, type, value, traceback): | |
155 if type is not None: | |
156 raise | |
157 for stmt in self._garbage: | |
158 del self.statements[stmt] | |
159 | |
160 | |
161 class ActiveStatements(object): | |
162 def __init__(self): | |
163 # This table holds statements asserted by any of our sources | |
164 # plus local statements that we introduce (source is | |
165 # http://bigasterisk.com/sse_collector/). | |
166 self.table: StatementTable = collections.defaultdict( | |
167 lambda: (set(), set())) | |
168 | |
169 def state(self) -> Dict: | |
170 return { | |
171 'len': len(self.table), | |
172 } | |
173 | |
174 def postDeleteStatements(self) -> PostDeleter: | |
175 return PostDeleter(self.table) | |
176 | |
177 def pprintTable(self) -> None: | |
178 for i, (stmt, (sources, handlers)) in enumerate( | |
179 sorted(self.table.items())): | |
180 print("%03d. %-80s from %s to %s" % ( | |
181 i, | |
182 abbrevStmt(stmt), | |
183 [abbrevTerm(s) for s in sources], | |
184 handlers)) | |
185 | |
186 @STATS.makeSyncPatch.time() | |
187 def makeSyncPatch(self, handler: PatchSink, sources: Set[SourceUri]): | |
188 # todo: this could run all handlers at once, which is how we | |
189 # use it anyway | |
190 adds = [] | |
191 dels = [] | |
192 | |
193 with self.postDeleteStatements() as garbage: | |
194 for stmt, (stmtSources, handlers) in self.table.items(): | |
195 belongsInHandler = not sources.isdisjoint(stmtSources) | |
196 handlerHasIt = handler in handlers | |
197 #log.debug("%s belong=%s has=%s", | |
198 # abbrevStmt(stmt), belongsInHandler, handlerHasIt) | |
199 if belongsInHandler and not handlerHasIt: | |
200 adds.append(stmt) | |
201 handlers.add(handler) | |
202 elif not belongsInHandler and handlerHasIt: | |
203 dels.append(stmt) | |
204 handlers.remove(handler) | |
205 if not handlers and not stmtSources: | |
206 garbage.add(stmt) | |
207 | |
208 return Patch(addQuads=adds, delQuads=dels) | |
209 | |
210 def applySourcePatch(self, source: SourceUri, p: Patch): | |
211 for stmt in p.addQuads: | |
212 sourceUrls, handlers = self.table[stmt] | |
213 if source in sourceUrls: | |
214 raise ValueError("%s added stmt that it already had: %s" % | |
215 (source, abbrevStmt(stmt))) | |
216 sourceUrls.add(source) | |
217 | |
218 with self.postDeleteStatements() as garbage: | |
219 for stmt in p.delQuads: | |
220 sourceUrls, handlers = self.table[stmt] | |
221 if source not in sourceUrls: | |
222 raise ValueError("%s deleting stmt that it didn't have: %s" % | |
223 (source, abbrevStmt(stmt))) | |
224 sourceUrls.remove(source) | |
225 # this is rare, since some handler probably still has | |
226 # the stmt we're deleting, but it can happen e.g. when | |
227 # a handler was just deleted | |
228 if not sourceUrls and not handlers: | |
229 garbage.add(stmt) | |
230 | |
231 @STATS.replaceSourceStatements.time() | |
232 def replaceSourceStatements(self, source: SourceUri, | |
233 stmts: Sequence[StatementType]): | |
234 log.debug('replaceSourceStatements with %s stmts', len(stmts)) | |
235 newStmts = set(stmts) | |
236 | |
237 with self.postDeleteStatements() as garbage: | |
238 for stmt, (sources, handlers) in self.table.items(): | |
239 if source in sources: | |
240 if stmt not in stmts: | |
241 sources.remove(source) | |
242 if not sources and not handlers: | |
243 garbage.add(stmt) | |
244 else: | |
245 if stmt in stmts: | |
246 sources.add(source) | |
247 newStmts.discard(stmt) | |
248 | |
249 self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[])) | |
250 | |
251 def discardHandler(self, handler: PatchSink): | |
252 with self.postDeleteStatements() as garbage: | |
253 for stmt, (sources, handlers) in self.table.items(): | |
254 handlers.discard(handler) | |
255 if not sources and not handlers: | |
256 garbage.add(stmt) | |
257 | |
258 def discardSource(self, source: SourceUri): | |
259 with self.postDeleteStatements() as garbage: | |
260 for stmt, (sources, handlers) in self.table.items(): | |
261 sources.discard(source) | |
262 if not sources and not handlers: | |
263 garbage.add(stmt) | |
264 | |
265 | |
266 | |
267 class GraphClients(object): | |
268 """ | |
269 All the active PatchSources and SSEHandlers | |
270 | |
271 To handle all the overlapping-statement cases, we store a set of | |
272 true statements along with the sources that are currently | |
273 asserting them and the requesters who currently know them. As | |
274 statements come and go, we make patches to send to requesters. | |
275 """ | |
276 def __init__(self): | |
277 self.clients: Dict[SourceUri, PatchSource] = {} # (COLLECTOR is not listed) | |
278 self.handlers: Set[PatchSink] = set() | |
279 self.statements: ActiveStatements = ActiveStatements() | |
280 | |
281 self._localStatements = LocalStatements(self._onPatch) | |
282 | |
283 def state(self) -> Dict: | |
284 return { | |
285 'clients': [ps.state() for ps in self.clients.values()], | |
286 'sseHandlers': [h.state() for h in self.handlers], | |
287 'statements': self.statements.state(), | |
288 } | |
289 | |
290 def _sourcesForHandler(self, handler: PatchSink) -> List[SourceUri]: | |
291 streamId = handler.streamId | |
292 matches = [s for s in config['streams'] if s['id'] == streamId] | |
293 if len(matches) != 1: | |
294 raise ValueError("%s matches for %r" % (len(matches), streamId)) | |
295 return [SourceUri(URIRef(s)) for s in matches[0]['sources']] + [ | |
296 COLLECTOR] | |
297 | |
298 @STATS.onPatch.time() | |
299 def _onPatch(self, source: SourceUri, p: Patch, fullGraph: bool=False): | |
300 if fullGraph: | |
301 # a reconnect may need to resend the full graph even | |
302 # though we've already sent some statements | |
303 self.statements.replaceSourceStatements(source, p.addQuads) | |
304 else: | |
305 self.statements.applySourcePatch(source, p) | |
306 | |
307 self._sendUpdatePatch() | |
308 | |
309 if log.isEnabledFor(logging.DEBUG): | |
310 self.statements.pprintTable() | |
311 | |
312 if source != COLLECTOR: | |
313 self._localStatements.setSourceState( | |
314 source, | |
315 ROOM['fullGraphReceived'] if fullGraph else | |
316 ROOM['patchesReceived']) | |
317 | |
318 @STATS.sendUpdatePatch.time() | |
319 def _sendUpdatePatch(self, handler: Optional[PatchSink]=None): | |
320 """ | |
321 send a patch event out this handler to bring it up to date with | |
322 self.statements | |
323 """ | |
324 now = time.time() | |
325 selected = self.handlers | |
326 if handler is not None: | |
327 if handler not in self.handlers: | |
328 log.error("called _sendUpdatePatch on a handler that's gone") | |
329 return | |
330 selected = {handler} | |
331 # reduce loops here- prepare all patches at once | |
332 for h in selected: | |
333 period = .9 | |
334 if 'Raspbian' in h.request.headers.get('user-agent', ''): | |
335 period = 5 | |
336 if h.lastPatchSentTime > now - period: | |
337 continue | |
338 p = self.statements.makeSyncPatch(h, set(self._sourcesForHandler(h))) | |
339 log.debug('makeSyncPatch for %r: %r', h, p.jsonRepr) | |
340 if not p.isNoop(): | |
341 log.debug("send patch %s to %s", p.shortSummary(), h) | |
342 # This can be a giant line, which was a problem | |
343 # once. Might be nice for this service to try to break | |
344 # it up into multiple sends, although there's no | |
345 # guarantee at all since any single stmt could be any | |
346 # length. | |
347 h.sendEvent(message=jsonFromPatch(p).encode('utf8'), | |
348 event=b'patch') | |
349 h.lastPatchSentTime = now | |
350 else: | |
351 log.debug('nothing to send to %s', h) | |
352 | |
353 def addSseHandler(self, handler: PatchSink): | |
354 log.info('addSseHandler %r %r', handler, handler.streamId) | |
355 | |
356 # fail early if id doesn't match | |
357 sources = self._sourcesForHandler(handler) | |
358 | |
359 self.handlers.add(handler) | |
360 | |
361 for source in sources: | |
362 if source not in self.clients and source != COLLECTOR: | |
363 log.debug('connect to patch source %s', source) | |
364 self._localStatements.setSourceState(source, ROOM['connect']) | |
365 self.clients[source] = ReconnectingPatchSource( | |
366 source, | |
367 listener=lambda p, fullGraph, source=source: self._onPatch( | |
368 source, p, fullGraph), | |
369 reconnectSecs=10) | |
370 log.debug('bring new client up to date') | |
371 | |
372 self._sendUpdatePatch(handler) | |
373 | |
374 def removeSseHandler(self, handler: PatchSink): | |
375 log.info('removeSseHandler %r', handler) | |
376 self.statements.discardHandler(handler) | |
377 for source in self._sourcesForHandler(handler): | |
378 for otherHandler in self.handlers: | |
379 if (otherHandler != handler and | |
380 source in self._sourcesForHandler(otherHandler)): | |
381 # still in use | |
382 break | |
383 else: | |
384 self._stopClient(source) | |
385 | |
386 self.handlers.remove(handler) | |
387 | |
388 def _stopClient(self, url: SourceUri): | |
389 if url == COLLECTOR: | |
390 return | |
391 | |
392 self.clients[url].stop() | |
393 | |
394 self.statements.discardSource(url) | |
395 | |
396 self._localStatements.setSourceState(url, None) | |
397 if url in self.clients: | |
398 del self.clients[url] | |
399 | |
400 self.cleanup() | |
401 | |
402 def cleanup(self): | |
403 """ | |
404 despite the attempts above, we still get useless rows in the table | |
405 sometimes | |
406 """ | |
407 with self.statements.postDeleteStatements() as garbage: | |
408 for stmt, (sources, handlers) in self.statements.table.items(): | |
409 if not sources and not any(h in self.handlers for h in handlers): | |
410 garbage.add(stmt) | |
411 | |
412 | |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
413 class State(cyclone.web.RequestHandler): |
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
414 @STATS.getState.time() |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
415 def get(self) -> None: |
306 | 416 try: |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
417 state = self.settings.graphClients.state() |
595 | 418 self.write(json.dumps({'graphClients': state}, indent=2, |
419 default=lambda obj: '<unserializable>')) | |
420 except Exception: | |
306 | 421 import traceback; traceback.print_exc() |
422 raise | |
423 | |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
424 |
313 | 425 class Root(cyclone.web.RequestHandler): |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
426 def get(self) -> None: |
313 | 427 self.write('<html><body>sse_collector</body></html>') |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
428 |
306 | 429 |
296 | 430 if __name__ == '__main__': |
431 arg = docopt(""" | |
432 Usage: sse_collector.py [options] | |
433 | |
434 -v Verbose | |
435 """) | |
436 | |
437 if arg['-v']: | |
449
ef7eba0551f2
collector partial py3+types update. WIP
drewp@bigasterisk.com
parents:
446
diff
changeset
|
438 enableTwistedLog() |
442
ee74dc3b58fb
collector build improvements; stats and logging
drewp@bigasterisk.com
parents:
439
diff
changeset
|
439 log.setLevel(logging.DEBUG) |
ee74dc3b58fb
collector build improvements; stats and logging
drewp@bigasterisk.com
parents:
439
diff
changeset
|
440 defer.setDebugging(True) |
296 | 441 |
442 | |
443 graphClients = GraphClients() | |
442
ee74dc3b58fb
collector build improvements; stats and logging
drewp@bigasterisk.com
parents:
439
diff
changeset
|
444 #exporter = InfluxExporter(... to export some stats values |
ee74dc3b58fb
collector build improvements; stats and logging
drewp@bigasterisk.com
parents:
439
diff
changeset
|
445 |
296 | 446 reactor.listenTCP( |
451
17a556ddc5ac
add types to sse_collector.py. Surprisingly few bugs found.
drewp@bigasterisk.com
parents:
449
diff
changeset
|
447 9072, |
296 | 448 cyclone.web.Application( |
449 handlers=[ | |
313 | 450 (r'/', Root), |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
451 (r'/state', State), |
470 | 452 (r'/graph/(.*)', PatchSink), |
442
ee74dc3b58fb
collector build improvements; stats and logging
drewp@bigasterisk.com
parents:
439
diff
changeset
|
453 (r'/stats/(.*)', StatsHandler, {'serverName': 'collector'}), |
296 | 454 ], |
455 graphClients=graphClients), | |
456 interface='::') | |
457 reactor.run() |