Mercurial > code > home > repos > homeauto
annotate service/reasoning/sse_collector.py @ 299:5084a1f719c9
port 9072
Ignore-this: b551e2a05376292330fc817ea9fa7ca2
author | drewp@bigasterisk.com |
---|---|
date | Fri, 19 Aug 2016 22:46:33 -0700 |
parents | 8d89da1915df |
children | 371af6e92b5e |
rev | line source |
---|---|
296 | 1 """ |
2 requesting /graph/foo returns an SSE patch stream that's the | |
3 result of fetching multiple other SSE patch streams. The result stream | |
4 may include new statements injected by this service. | |
5 | |
6 Future: | |
7 - filter out unneeded stmts from the sources | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
8 - give a time resolution and concatenate any patches that come faster than that res |
296 | 9 """ |
10 | |
11 config = { | |
12 'streams': [ | |
13 {'id': 'home', | |
14 'sources': [ | |
15 #'http://bang:9059/graph/events', | |
16 'http://plus:9075/graph/events', | |
17 ] | |
18 }, | |
19 ] | |
20 } | |
21 | |
22 from crochet import no_setup | |
23 no_setup() | |
24 | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
25 import sys, logging, traceback, json, collections |
296 | 26 from twisted.internet import reactor |
27 import cyclone.web, cyclone.sse | |
28 from rdflib import ConjunctiveGraph | |
29 from rdflib.parser import StringInputSource | |
30 from docopt import docopt | |
31 | |
32 from twisted_sse_demo.eventsource import EventSource | |
33 | |
34 sys.path.append("../../lib") | |
35 from logsetup import log | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
36 from patchablegraph import jsonFromPatch, PatchableGraph, patchFromJson |
296 | 37 |
38 sys.path.append("/my/proj/light9") | |
39 from light9.rdfdb.patch import Patch | |
40 | |
41 class PatchSource(object): | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
42 """wrap EventSource so it emits Patch objects and has an explicit stop method.""" |
296 | 43 def __init__(self, url): |
44 self.url = url | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
45 self._listeners = set() |
296 | 46 log.info('start read from %s', url) |
47 self._eventSource = EventSource(url) | |
48 self._eventSource.protocol.delimiter = '\n' | |
49 | |
50 self._eventSource.addEventListener('fullGraph', self._onFullGraph) | |
51 self._eventSource.addEventListener('patch', self._onMessage) | |
52 | |
53 def _onFullGraph(self, message): | |
54 try: | |
55 g = ConjunctiveGraph() | |
56 g.parse(StringInputSource(message), format='json-ld') | |
57 p = Patch(addGraph=g) | |
58 self._sendPatch(p) | |
59 except: | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
60 log.error(traceback.format_exc()) |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
61 raise |
296 | 62 |
63 def _onMessage(self, message): | |
64 try: | |
65 p = patchFromJson(message) | |
66 self._sendPatch(p) | |
67 except: | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
68 log.error(traceback.format_exc()) |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
69 raise |
296 | 70 |
71 def _sendPatch(self, p): | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
72 log.debug('PatchSource received patch %s', p.shortSummary()) |
296 | 73 for lis in self._listeners: |
74 lis(p) | |
75 | |
76 def addPatchListener(self, func): | |
77 self._listeners.add(func) | |
78 | |
79 def stop(self): | |
80 log.info('stop read from %s', self.url) | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
81 try: |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
82 self._eventSource.protocol.stopProducing() #? |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
83 except AttributeError: |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
84 pass |
296 | 85 self._eventSource = None |
86 | |
87 def __del__(self): | |
88 if self._eventSource: | |
89 raise ValueError | |
90 | |
91 class GraphClient(object): | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
92 """A listener of some PatchSources that emits patches to a cyclone SSEHandler.""" |
296 | 93 |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
94 def __init__(self, handler): |
296 | 95 self.handler = handler |
96 | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
97 # The graph that the requester knows. |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
98 # |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
99 # Note that often, 2 requests for the same streamId would have |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
100 # the same graph contents in this attribute and ought to share |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
101 # it. But, that's a little harder to write, and if clients |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
102 # want different throttling rates or have stalled different |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
103 # amounts, their currentGraph contents might drift apart |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
104 # temporarily. |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
105 self._currentGraph = PatchableGraph() |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
106 self._currentGraph.addObserver(self._sendPatch) |
296 | 107 |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
108 def addPatchSource(self, ps): |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
109 """Connect this object to a PatchSource whose patches should get applied to our output graph""" |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
110 # this is never getting released, so we'll keep sending until |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
111 # no one wants the source anymore. |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
112 ps.addPatchListener(self._onPatch) |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
113 |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
114 def _onPatch(self, p): |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
115 self._currentGraph.patch(p) |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
116 |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
117 def _sendPatch(self, jsonPatch): |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
118 self.handler.sendEvent(message=jsonPatch, event='patch') |
296 | 119 |
120 class GraphClients(object): | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
121 """ |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
122 All the active GraphClient objects |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
123 |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
124 To handle all the overlapping-statement cases, we store a set of |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
125 true statements along with the sources that are currently |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
126 asserting them and the requesters who currently know them. As |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
127 statements come and go, we make patches to send to requesters. |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
128 |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
129 todo: reconnect patchsources that go down and deal with their graph diffs |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
130 """ |
296 | 131 def __init__(self): |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
132 self.clients = {} # url: PatchSource |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
133 self.handlers = set() # handler |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
134 self.listeners = {} # url: [handler] (handler may appear under multiple urls) |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
135 self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers)` |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
136 |
296 | 137 def addSseHandler(self, handler, streamId): |
138 log.info('addSseHandler %r %r', handler, streamId) | |
139 matches = [s for s in config['streams'] if s['id'] == streamId] | |
140 if len(matches) != 1: | |
141 raise ValueError("%s matches for %r" % (len(matches), streamId)) | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
142 |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
143 self.handlers.add(handler) |
296 | 144 for source in matches[0]['sources']: |
145 if source not in self.clients: | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
146 ps = self.clients[source] = PatchSource(source) |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
147 ps.addPatchListener(lambda p, source=source: self._onPatch(source, p)) |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
148 self.listeners.setdefault(source, []).append(handler) |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
149 self._sendUpdatePatch(handler) |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
150 |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
151 def _onPatch(self, source, p): |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
152 |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
153 for stmt in p.addQuads: |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
154 sourceUrls, handlers = self.statements[stmt] |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
155 if source in sourceUrls: |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
156 raise ValueError("%s added stmt that it already had: %s" % (source, stmt)) |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
157 sourceUrls.add(source) |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
158 for stmt in p.delQuads: |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
159 sourceUrls, handlers = self.statements[stmt] |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
160 if source not in sourceUrls: |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
161 raise ValueError("%s deleting stmt that it didn't have: %s" % (source, stmt)) |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
162 sourceUrls.remove(source) |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
163 |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
164 for h in self.handlers: |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
165 self._sendUpdatePatch(h) |
296 | 166 |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
167 def _sendUpdatePatch(self, handler): |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
168 """send a patch event out this handler to bring it up to date with self.statements""" |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
169 adds = [] |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
170 dels = [] |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
171 statementsToClear = [] |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
172 for stmt, (sources, handlers) in self.statements.iteritems(): |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
173 if sources and (handler not in handlers): |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
174 adds.append(stmt) |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
175 handlers.add(handler) |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
176 if not sources and (handler in handlers): |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
177 dels.append(stmt) |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
178 handlers.remove(handler) |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
179 statementsToClear.append(stmt) |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
180 # todo: cleanup statementsToClear |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
181 p = Patch(addQuads=adds, delQuads=dels) |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
182 if not p.isNoop(): |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
183 log.debug("send patch %s to %s", p.shortSummary(), handler) |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
184 handler.sendEvent(message=jsonFromPatch(p), event='patch') |
296 | 185 |
186 def removeSseHandler(self, handler): | |
187 log.info('removeSseHandler %r', handler) | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
188 for url, handlers in self.listeners.items(): |
296 | 189 keep = [] |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
190 for h in handlers: |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
191 if h != handler: |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
192 keep.append(h) |
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
193 handlers[:] = keep |
296 | 194 if not keep: |
195 self.clients[url].stop() | |
196 del self.clients[url] | |
197 del self.listeners[url] | |
298
8d89da1915df
sse_collector now kind of gets concurrent requests right
drewp@bigasterisk.com
parents:
296
diff
changeset
|
198 self.handlers.remove(handler) |
296 | 199 |
200 class SomeGraph(cyclone.sse.SSEHandler): | |
201 def __init__(self, application, request): | |
202 cyclone.sse.SSEHandler.__init__(self, application, request) | |
203 self.id = request.uri[len('/graph/'):] | |
204 self.graphClients = self.settings.graphClients | |
205 | |
206 def bind(self): | |
207 self.graphClients.addSseHandler(self, self.id) | |
208 | |
209 def unbind(self): | |
210 self.graphClients.removeSseHandler(self) | |
211 | |
212 if __name__ == '__main__': | |
213 | |
214 arg = docopt(""" | |
215 Usage: sse_collector.py [options] | |
216 | |
217 -v Verbose | |
218 """) | |
219 | |
220 if arg['-v']: | |
221 import twisted.python.log | |
222 twisted.python.log.startLogging(sys.stdout) | |
223 log.setLevel(logging.DEBUG) | |
224 | |
225 | |
226 graphClients = GraphClients() | |
227 | |
228 reactor.listenTCP( | |
299 | 229 9072, |
296 | 230 cyclone.web.Application( |
231 handlers=[ | |
232 (r'/graph/(.*)', SomeGraph), | |
233 ], | |
234 graphClients=graphClients), | |
235 interface='::') | |
236 reactor.run() |