Mercurial > code > home > repos > homeauto
annotate service/reasoning/sse_collector.py @ 302:46c5fae89823
factor out patchsource
Ignore-this: a9757cc53b914cb8be1f880a6504336f
author | drewp@bigasterisk.com |
---|---|
date | Sun, 28 Aug 2016 23:43:03 -0700 |
parents | 29f593aee67b |
children | 66fe7a93753d |
rev | line source |
---|---|
296 | 1 """ |
2 requesting /graph/foo returns an SSE patch stream that's the | |
3 result of fetching multiple other SSE patch streams. The result stream | |
4 may include new statements injected by this service. | |
5 | |
6 Future: | |
7 - filter out unneeded stmts from the sources | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
8 - give a time resolution and concatenate any patches that come faster than that res |
296 | 9 """ |
10 | |
11 config = { | |
12 'streams': [ | |
13 {'id': 'home', | |
14 'sources': [ | |
15 #'http://bang:9059/graph/events', | |
16 'http://plus:9075/graph/events', | |
17 ] | |
18 }, | |
19 ] | |
20 } | |
21 | |
22 from crochet import no_setup | |
23 no_setup() | |
24 | |
302 | 25 import sys, logging, collections |
26 from twisted.internet import reactor | |
296 | 27 import cyclone.web, cyclone.sse |
302 | 28 from rdflib import URIRef, Namespace |
296 | 29 from docopt import docopt |
30 | |
31 | |
32 sys.path.append("../../lib") | |
33 from logsetup import log | |
302 | 34 from patchablegraph import jsonFromPatch |
296 | 35 |
36 sys.path.append("/my/proj/light9") | |
37 from light9.rdfdb.patch import Patch | |
38 | |
302 | 39 from patchsource import ReconnectingPatchSource |
40 | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
41 ROOM = Namespace("http://projects.bigasterisk.com/room/") |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
42 COLLECTOR = URIRef('http://bigasterisk.com/sse_collector/') |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
43 |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
44 class LocalStatements(object): |
301 | 45 """ |
46 functions that make statements originating from sse_collector itself | |
47 """ | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
48 def __init__(self, applyPatch): |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
49 self.applyPatch = applyPatch |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
50 self._sourceState = {} # source: state URIRef |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
51 |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
52 def setSourceState(self, source, state): |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
53 """ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
54 add a patch to the COLLECTOR graph about the state of this |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
55 source. state=None to remove the source. |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
56 """ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
57 oldState = self._sourceState.get(source, None) |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
58 if state == oldState: |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
59 return |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
60 log.info('source state %s -> %s', source, state) |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
61 if oldState is None: |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
62 self._sourceState[source] = state |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
63 self.applyPatch(COLLECTOR, Patch(addQuads=[ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
64 (COLLECTOR, ROOM['source'], source, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
65 (source, ROOM['state'], state, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
66 ])) |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
67 elif state is None: |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
68 del self._sourceState[source] |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
69 self.applyPatch(COLLECTOR, Patch(delQuads=[ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
70 (COLLECTOR, ROOM['source'], source, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
71 (source, ROOM['state'], oldState, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
72 ])) |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
73 else: |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
74 self._sourceState[source] = state |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
75 self.applyPatch(COLLECTOR, Patch( |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
76 addQuads=[ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
77 (source, ROOM['state'], state, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
78 ], |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
79 delQuads=[ |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
80 (source, ROOM['state'], oldState, COLLECTOR), |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
81 ])) |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
82 |
301 | 83 def abbrevTerm(t): |
84 if isinstance(t, URIRef): | |
85 return (t.replace('http://projects.bigasterisk.com/room/', 'room:') | |
86 .replace('http://bigasterisk.com/sse_collector/', 'sc:')) | |
87 return t | |
88 | |
89 def abbrevStmt(stmt): | |
90 return '(%s %s %s %s)' % tuple(map(abbrevTerm, stmt)) | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
91 |
301 | 92 class ActiveStatements(object): |
296 | 93 def __init__(self): |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
94 |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
95 # This table holds statements asserted by any of our sources |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
96 # plus local statements that we introduce (source is |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
97 # http://bigasterisk.com/sse_collector/). |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
98 self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers)` |
301 | 99 |
100 def _postDeleteStatements(self): | |
101 statements = self.statements | |
102 class PostDeleter(object): | |
103 def __enter__(self): | |
104 self._garbage = [] | |
105 return self | |
106 def add(self, stmt): | |
107 self._garbage.append(stmt) | |
108 def __exit__(self, type, value, traceback): | |
109 if type is not None: | |
110 raise | |
111 for stmt in self._garbage: | |
112 del statements[stmt] | |
113 return PostDeleter() | |
114 | |
115 def pprintTable(self): | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
116 for i, (stmt, (sources, handlers)) in enumerate(sorted(self.statements.items())): |
301 | 117 print "%03d. %-80s from %s to %s" % ( |
118 i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers) | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
119 |
301 | 120 def makeSyncPatch(self, handler, sources): |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
121 # todo: this could run all handlers at once, which is how we use it anyway |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
122 adds = [] |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
123 dels = [] |
301 | 124 |
125 with self._postDeleteStatements() as garbage: | |
126 for stmt, (stmtSources, handlers) in self.statements.iteritems(): | |
127 belongsInHandler = not set(sources).isdisjoint(stmtSources) | |
128 handlerHasIt = handler in handlers | |
129 #log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt) | |
130 if belongsInHandler and not handlerHasIt: | |
131 adds.append(stmt) | |
132 handlers.add(handler) | |
133 elif not belongsInHandler and handlerHasIt: | |
134 dels.append(stmt) | |
135 handlers.remove(handler) | |
136 if not handlers and not stmtSources: | |
137 garbage.add(stmt) | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
138 |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
139 return Patch(addQuads=adds, delQuads=dels) |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
140 |
301 | 141 def applySourcePatch(self, source, p): |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
142 for stmt in p.addQuads: |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
143 sourceUrls, handlers = self.statements[stmt] |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
144 if source in sourceUrls: |
301 | 145 raise ValueError("%s added stmt that it already had: %s" % |
146 (source, abbrevStmt(stmt))) | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
147 sourceUrls.add(source) |
301 | 148 |
149 with self._postDeleteStatements() as garbage: | |
150 for stmt in p.delQuads: | |
151 sourceUrls, handlers = self.statements[stmt] | |
152 if source not in sourceUrls: | |
153 raise ValueError("%s deleting stmt that it didn't have: %s" % | |
154 (source, abbrevStmt(stmt))) | |
155 sourceUrls.remove(source) | |
156 # this is rare, since some handler probably still has | |
157 # the stmt we're deleting, but it can happen e.g. when | |
158 # a handler was just deleted | |
159 if not sourceUrls and not handlers: | |
160 garbage.add(stmt) | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
161 |
301 | 162 def replaceSourceStatements(self, source, stmts): |
163 log.debug('replaceSourceStatements with %s stmts', len(stmts)) | |
164 newStmts = set(stmts) | |
165 | |
166 with self._postDeleteStatements() as garbage: | |
167 for stmt, (sources, handlers) in self.statements.iteritems(): | |
168 if source in sources: | |
169 if stmt not in stmts: | |
170 sources.remove(source) | |
171 if not sources and not handlers: | |
172 garbage.add(stmt) | |
173 else: | |
174 if stmt in stmts: | |
175 sources.add(source) | |
176 newStmts.discard(stmt) | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
177 |
301 | 178 self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[])) |
179 | |
180 def discardHandler(self, handler): | |
181 with self._postDeleteStatements() as garbage: | |
182 for stmt, (sources, handlers) in self.statements.iteritems(): | |
183 handlers.discard(handler) | |
184 if not sources and not handlers: | |
185 garbage.add(stmt) | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
186 |
301 | 187 def discardSource(self, source): |
188 with self._postDeleteStatements() as garbage: | |
189 for stmt, (sources, handlers) in self.statements.iteritems(): | |
190 sources.discard(source) | |
191 if not sources and not handlers: | |
192 garbage.add(stmt) | |
193 | |
194 class GraphClients(object): | |
195 """ | |
196 All the active PatchSources and SSEHandlers | |
197 | |
198 To handle all the overlapping-statement cases, we store a set of | |
199 true statements along with the sources that are currently | |
200 asserting them and the requesters who currently know them. As | |
201 statements come and go, we make patches to send to requesters. | |
202 """ | |
203 def __init__(self): | |
204 self.clients = {} # url: PatchSource (COLLECTOR is not listed) | |
205 self.handlers = set() # handler | |
206 self.statements = ActiveStatements() | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
207 |
301 | 208 self._localStatements = LocalStatements(self._onPatch) |
209 | |
210 def _sourcesForHandler(self, handler): | |
211 streamId = handler.streamId | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
212 matches = [s for s in config['streams'] if s['id'] == streamId] |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
213 if len(matches) != 1: |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
214 raise ValueError("%s matches for %r" % (len(matches), streamId)) |
301 | 215 return map(URIRef, matches[0]['sources']) + [COLLECTOR] |
216 | |
217 def _onPatch(self, source, p, fullGraph=False): | |
218 if fullGraph: | |
219 # a reconnect may need to resend the full graph even | |
220 # though we've already sent some statements | |
221 self.statements.replaceSourceStatements(source, p.addQuads) | |
222 else: | |
223 self.statements.applySourcePatch(source, p) | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
224 |
301 | 225 self._sendUpdatePatch() |
226 | |
227 if log.isEnabledFor(logging.DEBUG): | |
228 self.statements.pprintTable() | |
229 | |
230 if source != COLLECTOR: | |
231 self._localStatements.setSourceState( | |
232 source, | |
233 ROOM['fullGraphReceived'] if fullGraph else | |
234 ROOM['patchesReceived']) | |
235 | |
236 def _sendUpdatePatch(self, handler=None): | |
237 """ | |
238 send a patch event out this handler to bring it up to date with | |
239 self.statements | |
240 """ | |
241 # reduce loops here- prepare all patches at once | |
242 for h in (self.handlers if handler is None else [handler]): | |
243 p = self.statements.makeSyncPatch(h, self._sourcesForHandler(h)) | |
244 if not p.isNoop(): | |
245 log.debug("send patch %s to %s", p.shortSummary(), h) | |
246 h.sendEvent(message=jsonFromPatch(p), event='patch') | |
247 | |
248 def addSseHandler(self, handler): | |
249 log.info('addSseHandler %r %r', handler, handler.streamId) | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
250 self.handlers.add(handler) |
301 | 251 |
252 for source in self._sourcesForHandler(handler): | |
253 if source not in self.clients and source != COLLECTOR: | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
254 self._localStatements.setSourceState(source, ROOM['connect']) |
302 | 255 self.clients[source] = ReconnectingPatchSource( |
301 | 256 source, listener=lambda p, fullGraph, source=source: self._onPatch( |
257 source, p, fullGraph)) | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
258 self._sendUpdatePatch(handler) |
296 | 259 |
260 def removeSseHandler(self, handler): | |
261 log.info('removeSseHandler %r', handler) | |
301 | 262 |
263 self.statements.discardHandler(handler) | |
264 | |
265 for source in self._sourcesForHandler(handler): | |
266 for otherHandler in self.handlers: | |
267 if (otherHandler != handler and | |
268 source in self._sourcesForHandler(otherHandler)): | |
269 break | |
270 else: | |
271 self._stopClient(source) | |
272 | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
273 self.handlers.remove(handler) |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
274 |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
275 def _stopClient(self, url): |
301 | 276 if url == COLLECTOR: |
277 return | |
278 | |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
279 self.clients[url].stop() |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
280 |
301 | 281 self.statements.discardSource(url) |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
282 |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
283 self._localStatements.setSourceState(url, None) |
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
284 del self.clients[url] |
301 | 285 |
300
371af6e92b5e
local state statements and self.statements rewrite
drewp@bigasterisk.com
parents:
299
diff
changeset
|
286 |
296 | 287 class SomeGraph(cyclone.sse.SSEHandler): |
301 | 288 _handlerSerial = 0 |
296 | 289 def __init__(self, application, request): |
290 cyclone.sse.SSEHandler.__init__(self, application, request) | |
301 | 291 self.streamId = request.uri[len('/graph/'):] |
296 | 292 self.graphClients = self.settings.graphClients |
293 | |
301 | 294 self._serial = SomeGraph._handlerSerial |
295 SomeGraph._handlerSerial += 1 | |
296 | |
297 def __repr__(self): | |
298 return '<Handler #%s>' % self._serial | |
299 | |
296 | 300 def bind(self): |
301 | 301 self.graphClients.addSseHandler(self) |
296 | 302 |
303 def unbind(self): | |
304 self.graphClients.removeSseHandler(self) | |
305 | |
306 if __name__ == '__main__': | |
307 | |
308 arg = docopt(""" | |
309 Usage: sse_collector.py [options] | |
310 | |
311 -v Verbose | |
312 """) | |
313 | |
314 if arg['-v']: | |
315 import twisted.python.log | |
316 twisted.python.log.startLogging(sys.stdout) | |
317 log.setLevel(logging.DEBUG) | |
318 | |
319 | |
320 graphClients = GraphClients() | |
321 | |
322 reactor.listenTCP( | |
299 | 323 9072, |
296 | 324 cyclone.web.Application( |
325 handlers=[ | |
326 (r'/graph/(.*)', SomeGraph), | |
327 ], | |
328 graphClients=graphClients), | |
329 interface='::') | |
330 reactor.run() |