Mercurial > code > home > repos > homeauto
annotate service/collector/sse_collector.py @ 443:2f7bc2ecf6b5
more of the stats and logging patch for collector
Ignore-this: 3c69ae831f7759282c64ce7204f424ea
author | drewp@bigasterisk.com |
---|---|
date | Thu, 18 Apr 2019 09:17:00 -0700 |
parents | ee74dc3b58fb |
children | 96d712dccf28 |
rev | line source |
---|---|
306 | 1 from __future__ import division |
296 | 2 """ |
3 requesting /graph/foo returns an SSE patch stream that's the | |
4 result of fetching multiple other SSE patch streams. The result stream | |
5 may include new statements injected by this service. | |
6 | |
7 Future: | |
8 - filter out unneeded stmts from the sources | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
9 - give a time resolution and concatenate any patches that come faster than that res |
296 | 10 """ |
11 from crochet import no_setup | |
12 no_setup() | |
13 | |
306 | 14 import sys, logging, collections, json, time |
442
ee74dc3b58fb
collector build improvements; stats and logging
drewp@bigasterisk.com
parents:
439
diff
changeset
|
15 from twisted.internet import reactor, defer |
296 | 16 import cyclone.web, cyclone.sse |
302 | 17 from rdflib import URIRef, Namespace |
296 | 18 from docopt import docopt |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
19 from greplin import scales |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
20 from greplin.scales.cyclonehandler import StatsHandler |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
21 from logsetup import log, enableTwistedLog |
296 | 22 from logsetup import log |
302 | 23 from patchablegraph import jsonFromPatch |
351
7716b1810d6c
reasoning & collector move into docker images
drewp@bigasterisk.com
parents:
316
diff
changeset
|
24 from rdfdb.patch import Patch |
302 | 25 from patchsource import ReconnectingPatchSource |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
26 from sse_collector_config import config |
302 | 27 |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
28 ROOM = Namespace("http://projects.bigasterisk.com/room/") |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
29 COLLECTOR = URIRef('http://bigasterisk.com/sse_collector/') |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
30 |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
31 STATS = scales.collection('/root', |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
32 scales.PmfStat('getState'), |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
33 scales.PmfStat('localStatementsPatch'), |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
34 scales.PmfStat('makeSyncPatch'), |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
35 scales.PmfStat('onPatch'), |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
36 scales.PmfStat('sendUpdatePatch'), |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
37 scales.PmfStat('replaceSourceStatements'), |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
38 ) |
351
7716b1810d6c
reasoning & collector move into docker images
drewp@bigasterisk.com
parents:
316
diff
changeset
|
39 |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
40 class LocalStatements(object): |
301 | 41 """ |
42 functions that make statements originating from sse_collector itself | |
43 """ | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
44 def __init__(self, applyPatch): |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
45 self.applyPatch = applyPatch |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
46 self._sourceState = {} # source: state URIRef |
306 | 47 |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
48 @STATS.localStatementsPatch.time() |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
49 def setSourceState(self, source, state): |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
50 """ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
51 add a patch to the COLLECTOR graph about the state of this |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
52 source. state=None to remove the source. |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
53 """ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
54 oldState = self._sourceState.get(source, None) |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
55 if state == oldState: |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
56 return |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
57 log.info('source state %s -> %s', source, state) |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
58 if oldState is None: |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
59 self._sourceState[source] = state |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
60 self.applyPatch(COLLECTOR, Patch(addQuads=[ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
61 (COLLECTOR, ROOM['source'], source, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
62 (source, ROOM['state'], state, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
63 ])) |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
64 elif state is None: |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
65 del self._sourceState[source] |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
66 self.applyPatch(COLLECTOR, Patch(delQuads=[ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
67 (COLLECTOR, ROOM['source'], source, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
68 (source, ROOM['state'], oldState, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
69 ])) |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
70 else: |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
71 self._sourceState[source] = state |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
72 self.applyPatch(COLLECTOR, Patch( |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
73 addQuads=[ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
74 (source, ROOM['state'], state, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
75 ], |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
76 delQuads=[ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
77 (source, ROOM['state'], oldState, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
78 ])) |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
79 |
301 | 80 def abbrevTerm(t): |
81 if isinstance(t, URIRef): | |
82 return (t.replace('http://projects.bigasterisk.com/room/', 'room:') | |
83 .replace('http://bigasterisk.com/sse_collector/', 'sc:')) | |
84 return t | |
85 | |
86 def abbrevStmt(stmt): | |
87 return '(%s %s %s %s)' % tuple(map(abbrevTerm, stmt)) | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
88 |
301 | 89 class ActiveStatements(object): |
296 | 90 def __init__(self): |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
91 |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
92 # This table holds statements asserted by any of our sources |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
93 # plus local statements that we introduce (source is |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
94 # http://bigasterisk.com/sse_collector/). |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
95 self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers)` |
306 | 96 |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
97 def state(self): |
306 | 98 return { |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
99 'len': len(self.statements), |
306 | 100 } |
101 | |
301 | 102 def _postDeleteStatements(self): |
103 statements = self.statements | |
104 class PostDeleter(object): | |
105 def __enter__(self): | |
106 self._garbage = [] | |
107 return self | |
108 def add(self, stmt): | |
109 self._garbage.append(stmt) | |
110 def __exit__(self, type, value, traceback): | |
111 if type is not None: | |
112 raise | |
113 for stmt in self._garbage: | |
114 del statements[stmt] | |
115 return PostDeleter() | |
116 | |
117 def pprintTable(self): | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
118 for i, (stmt, (sources, handlers)) in enumerate(sorted(self.statements.items())): |
301 | 119 print "%03d. %-80s from %s to %s" % ( |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
120 i, |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
121 abbrevStmt(stmt), |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
122 [abbrevTerm(s) for s in sources], |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
123 handlers) |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
124 |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
125 @STATS.makeSyncPatch.time() |
301 | 126 def makeSyncPatch(self, handler, sources): |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
127 # todo: this could run all handlers at once, which is how we use it anyway |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
128 adds = [] |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
129 dels = [] |
301 | 130 |
131 with self._postDeleteStatements() as garbage: | |
132 for stmt, (stmtSources, handlers) in self.statements.iteritems(): | |
133 belongsInHandler = not set(sources).isdisjoint(stmtSources) | |
134 handlerHasIt = handler in handlers | |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
135 log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt) |
301 | 136 if belongsInHandler and not handlerHasIt: |
137 adds.append(stmt) | |
138 handlers.add(handler) | |
139 elif not belongsInHandler and handlerHasIt: | |
140 dels.append(stmt) | |
141 handlers.remove(handler) | |
142 if not handlers and not stmtSources: | |
143 garbage.add(stmt) | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
144 |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
145 return Patch(addQuads=adds, delQuads=dels) |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
146 |
301 | 147 def applySourcePatch(self, source, p): |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
148 for stmt in p.addQuads: |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
149 sourceUrls, handlers = self.statements[stmt] |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
150 if source in sourceUrls: |
301 | 151 raise ValueError("%s added stmt that it already had: %s" % |
152 (source, abbrevStmt(stmt))) | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
153 sourceUrls.add(source) |
301 | 154 |
155 with self._postDeleteStatements() as garbage: | |
156 for stmt in p.delQuads: | |
157 sourceUrls, handlers = self.statements[stmt] | |
158 if source not in sourceUrls: | |
159 raise ValueError("%s deleting stmt that it didn't have: %s" % | |
160 (source, abbrevStmt(stmt))) | |
161 sourceUrls.remove(source) | |
162 # this is rare, since some handler probably still has | |
163 # the stmt we're deleting, but it can happen e.g. when | |
164 # a handler was just deleted | |
165 if not sourceUrls and not handlers: | |
166 garbage.add(stmt) | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
167 |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
168 @STATS.replaceSourceStatements.time() |
301 | 169 def replaceSourceStatements(self, source, stmts): |
170 log.debug('replaceSourceStatements with %s stmts', len(stmts)) | |
171 newStmts = set(stmts) | |
172 | |
173 with self._postDeleteStatements() as garbage: | |
174 for stmt, (sources, handlers) in self.statements.iteritems(): | |
175 if source in sources: | |
176 if stmt not in stmts: | |
177 sources.remove(source) | |
178 if not sources and not handlers: | |
179 garbage.add(stmt) | |
180 else: | |
181 if stmt in stmts: | |
182 sources.add(source) | |
183 newStmts.discard(stmt) | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
184 |
301 | 185 self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[])) |
186 | |
187 def discardHandler(self, handler): | |
188 with self._postDeleteStatements() as garbage: | |
189 for stmt, (sources, handlers) in self.statements.iteritems(): | |
190 handlers.discard(handler) | |
191 if not sources and not handlers: | |
192 garbage.add(stmt) | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
193 |
301 | 194 def discardSource(self, source): |
195 with self._postDeleteStatements() as garbage: | |
196 for stmt, (sources, handlers) in self.statements.iteritems(): | |
197 sources.discard(source) | |
198 if not sources and not handlers: | |
199 garbage.add(stmt) | |
200 | |
201 class GraphClients(object): | |
202 """ | |
203 All the active PatchSources and SSEHandlers | |
204 | |
205 To handle all the overlapping-statement cases, we store a set of | |
206 true statements along with the sources that are currently | |
207 asserting them and the requesters who currently know them. As | |
208 statements come and go, we make patches to send to requesters. | |
209 """ | |
210 def __init__(self): | |
211 self.clients = {} # url: PatchSource (COLLECTOR is not listed) | |
212 self.handlers = set() # handler | |
213 self.statements = ActiveStatements() | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
214 |
301 | 215 self._localStatements = LocalStatements(self._onPatch) |
216 | |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
217 def state(self): |
306 | 218 return { |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
219 'clients': [ps.state() for ps in self.clients.values()], |
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
220 'sseHandlers': [h.state() for h in self.handlers], |
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
221 'statements': self.statements.state(), |
306 | 222 } |
223 | |
301 | 224 def _sourcesForHandler(self, handler): |
225 streamId = handler.streamId | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
226 matches = [s for s in config['streams'] if s['id'] == streamId] |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
227 if len(matches) != 1: |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
228 raise ValueError("%s matches for %r" % (len(matches), streamId)) |
301 | 229 return map(URIRef, matches[0]['sources']) + [COLLECTOR] |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
230 |
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
231 @STATS.onPatch.time() |
301 | 232 def _onPatch(self, source, p, fullGraph=False): |
233 if fullGraph: | |
234 # a reconnect may need to resend the full graph even | |
235 # though we've already sent some statements | |
236 self.statements.replaceSourceStatements(source, p.addQuads) | |
237 else: | |
238 self.statements.applySourcePatch(source, p) | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
239 |
301 | 240 self._sendUpdatePatch() |
241 | |
242 if log.isEnabledFor(logging.DEBUG): | |
243 self.statements.pprintTable() | |
244 | |
245 if source != COLLECTOR: | |
246 self._localStatements.setSourceState( | |
247 source, | |
248 ROOM['fullGraphReceived'] if fullGraph else | |
249 ROOM['patchesReceived']) | |
250 | |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
251 @STATS.sendUpdatePatch.time() |
301 | 252 def _sendUpdatePatch(self, handler=None): |
253 """ | |
254 send a patch event out this handler to bring it up to date with | |
255 self.statements | |
256 """ | |
257 # reduce loops here- prepare all patches at once | |
258 for h in (self.handlers if handler is None else [handler]): | |
259 p = self.statements.makeSyncPatch(h, self._sourcesForHandler(h)) | |
443
2f7bc2ecf6b5
more of the stats and logging patch for collector
drewp@bigasterisk.com
parents:
442
diff
changeset
|
260 log.debug('makeSyncPatch for %r: %r', h, p.jsonRepr) |
301 | 261 if not p.isNoop(): |
262 log.debug("send patch %s to %s", p.shortSummary(), h) | |
316
02d9915b3bbb
patchsource accept much longer lines from sse_collector
drewp@bigasterisk.com
parents:
313
diff
changeset
|
263 # This can be a giant line, which was a problem once. Might be |
02d9915b3bbb
patchsource accept much longer lines from sse_collector
drewp@bigasterisk.com
parents:
313
diff
changeset
|
264 # nice for this service to try to break it up into multiple sends, |
02d9915b3bbb
patchsource accept much longer lines from sse_collector
drewp@bigasterisk.com
parents:
313
diff
changeset
|
265 # although there's no guarantee at all since any single stmt |
02d9915b3bbb
patchsource accept much longer lines from sse_collector
drewp@bigasterisk.com
parents:
313
diff
changeset
|
266 # could be any length. |
301 | 267 h.sendEvent(message=jsonFromPatch(p), event='patch') |
306 | 268 |
301 | 269 def addSseHandler(self, handler): |
270 log.info('addSseHandler %r %r', handler, handler.streamId) | |
313 | 271 |
272 # fail early if id doesn't match | |
273 sources = self._sourcesForHandler(handler) | |
274 | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
275 self.handlers.add(handler) |
301 | 276 |
313 | 277 for source in sources: |
301 | 278 if source not in self.clients and source != COLLECTOR: |
442
ee74dc3b58fb
collector build improvements; stats and logging
drewp@bigasterisk.com
parents:
439
diff
changeset
|
279 log.debug('connect to patch source %s', source) |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
280 self._localStatements.setSourceState(source, ROOM['connect']) |
302 | 281 self.clients[source] = ReconnectingPatchSource( |
301 | 282 source, listener=lambda p, fullGraph, source=source: self._onPatch( |
283 source, p, fullGraph)) | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
284 self._sendUpdatePatch(handler) |
296 | 285 |
286 def removeSseHandler(self, handler): | |
287 log.info('removeSseHandler %r', handler) | |
301 | 288 self.statements.discardHandler(handler) |
289 for source in self._sourcesForHandler(handler): | |
290 for otherHandler in self.handlers: | |
291 if (otherHandler != handler and | |
292 source in self._sourcesForHandler(otherHandler)): | |
293 break | |
294 else: | |
295 self._stopClient(source) | |
296 | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
297 self.handlers.remove(handler) |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
298 |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
299 def _stopClient(self, url): |
301 | 300 if url == COLLECTOR: |
301 return | |
302 | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
303 self.clients[url].stop() |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
304 |
301 | 305 self.statements.discardSource(url) |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
306 |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
307 self._localStatements.setSourceState(url, None) |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
308 del self.clients[url] |
301 | 309 |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
310 |
296 | 311 class SomeGraph(cyclone.sse.SSEHandler): |
301 | 312 _handlerSerial = 0 |
296 | 313 def __init__(self, application, request): |
314 cyclone.sse.SSEHandler.__init__(self, application, request) | |
301 | 315 self.streamId = request.uri[len('/graph/'):] |
296 | 316 self.graphClients = self.settings.graphClients |
306 | 317 self.created = time.time() |
296 | 318 |
301 | 319 self._serial = SomeGraph._handlerSerial |
320 SomeGraph._handlerSerial += 1 | |
321 | |
322 def __repr__(self): | |
323 return '<Handler #%s>' % self._serial | |
306 | 324 |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
325 def state(self): |
306 | 326 return { |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
327 'created': round(self.created, 2), |
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
328 'ageHours': round((time.time() - self.created) / 3600, 2), |
306 | 329 'streamId': self.streamId, |
330 'remoteIp': self.request.remote_ip, | |
331 'userAgent': self.request.headers.get('user-agent'), | |
332 } | |
301 | 333 |
296 | 334 def bind(self): |
301 | 335 self.graphClients.addSseHandler(self) |
296 | 336 |
337 def unbind(self): | |
338 self.graphClients.removeSseHandler(self) | |
339 | |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
340 class State(cyclone.web.RequestHandler): |
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
341 @STATS.getState.time() |
306 | 342 def get(self): |
343 try: | |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
344 state = self.settings.graphClients.state() |
306 | 345 except: |
346 import traceback; traceback.print_exc() | |
347 raise | |
348 | |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
349 self.write(json.dumps({'graphClients': state}, indent=2)) |
313 | 350 |
351 class Root(cyclone.web.RequestHandler): | |
352 def get(self): | |
353 self.write('<html><body>sse_collector</body></html>') | |
306 | 354 |
296 | 355 if __name__ == '__main__': |
356 arg = docopt(""" | |
357 Usage: sse_collector.py [options] | |
358 | |
359 -v Verbose | |
360 """) | |
361 | |
362 if arg['-v']: | |
363 import twisted.python.log | |
364 twisted.python.log.startLogging(sys.stdout) | |
442
ee74dc3b58fb
collector build improvements; stats and logging
drewp@bigasterisk.com
parents:
439
diff
changeset
|
365 log.setLevel(logging.DEBUG) |
ee74dc3b58fb
collector build improvements; stats and logging
drewp@bigasterisk.com
parents:
439
diff
changeset
|
366 defer.setDebugging(True) |
296 | 367 |
368 | |
369 graphClients = GraphClients() | |
442
ee74dc3b58fb
collector build improvements; stats and logging
drewp@bigasterisk.com
parents:
439
diff
changeset
|
370 #exporter = InfluxExporter(... to export some stats values |
ee74dc3b58fb
collector build improvements; stats and logging
drewp@bigasterisk.com
parents:
439
diff
changeset
|
371 |
296 | 372 reactor.listenTCP( |
299 | 373 9072, |
296 | 374 cyclone.web.Application( |
375 handlers=[ | |
313 | 376 (r'/', Root), |
439
124c921ad52d
stats->state to make room for greplin stats
drewp@bigasterisk.com
parents:
353
diff
changeset
|
377 (r'/state', State), |
296 | 378 (r'/graph/(.*)', SomeGraph), |
442
ee74dc3b58fb
collector build improvements; stats and logging
drewp@bigasterisk.com
parents:
439
diff
changeset
|
379 (r'/stats/(.*)', StatsHandler, {'serverName': 'collector'}), |
296 | 380 ], |
381 graphClients=graphClients), | |
382 interface='::') | |
383 reactor.run() |