Mercurial > code > home > repos > homeauto
comparison service/reasoning/sse_collector.py @ 1103:b84e956771fc
sse_collector now kind of gets concurrent requests right
Ignore-this: e1a104d9ae81473b86fc12fbb8ac097b
darcs-hash:1bc1655b532074d97d7b8b7dd65802a9c62b6ff9
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Fri, 19 Aug 2016 22:37:01 -0700 |
parents | 2c7fd4e246ed |
children | 5084a1f719c9 |
comparison
equal
deleted
inserted
replaced
1102:06a511a96f20 | 1103:b84e956771fc |
---|---|
3 result of fetching multiple other SSE patch streams. The result stream | 3 result of fetching multiple other SSE patch streams. The result stream |
4 may include new statements injected by this service. | 4 may include new statements injected by this service. |
5 | 5 |
6 Future: | 6 Future: |
7 - filter out unneeded stmts from the sources | 7 - filter out unneeded stmts from the sources |
8 - give a time resolution and concatenate patches faster than that res | 8 - give a time resolution and concatenate any patches that come faster than that res |
9 """ | 9 """ |
10 | 10 |
11 config = { | 11 config = { |
12 'streams': [ | 12 'streams': [ |
13 {'id': 'home', | 13 {'id': 'home', |
20 } | 20 } |
21 | 21 |
22 from crochet import no_setup | 22 from crochet import no_setup |
23 no_setup() | 23 no_setup() |
24 | 24 |
25 import sys, logging, weakref, traceback, json | 25 import sys, logging, traceback, json, collections |
26 from twisted.internet import reactor | 26 from twisted.internet import reactor |
27 import cyclone.web, cyclone.sse | 27 import cyclone.web, cyclone.sse |
28 from rdflib import ConjunctiveGraph | 28 from rdflib import ConjunctiveGraph |
29 from rdflib.parser import StringInputSource | 29 from rdflib.parser import StringInputSource |
30 from docopt import docopt | 30 from docopt import docopt |
31 | 31 |
32 from twisted_sse_demo.eventsource import EventSource | 32 from twisted_sse_demo.eventsource import EventSource |
33 | 33 |
34 sys.path.append("../../lib") | 34 sys.path.append("../../lib") |
35 from logsetup import log | 35 from logsetup import log |
36 from patchablegraph import patchAsJson | 36 from patchablegraph import jsonFromPatch, PatchableGraph, patchFromJson |
37 | 37 |
38 sys.path.append("/my/proj/light9") | 38 sys.path.append("/my/proj/light9") |
39 from light9.rdfdb.patch import Patch | 39 from light9.rdfdb.patch import Patch |
40 | 40 |
41 def patchFromJson(j): | |
42 body = json.loads(j)['patch'] | |
43 a = ConjunctiveGraph() | |
44 a.parse(StringInputSource(json.dumps(body['adds'])), format='json-ld') | |
45 d = ConjunctiveGraph() | |
46 d.parse(StringInputSource(json.dumps(body['deletes'])), format='json-ld') | |
47 return Patch(addGraph=a, delGraph=d) | |
48 | |
49 class PatchSource(object): | 41 class PatchSource(object): |
50 """wrap EventSource so it emits Patch objects and has an explicit stop method""" | 42 """wrap EventSource so it emits Patch objects and has an explicit stop method.""" |
51 def __init__(self, url): | 43 def __init__(self, url): |
52 self.url = url | 44 self.url = url |
53 self._listeners = set()#weakref.WeakSet() | 45 self._listeners = set() |
54 log.info('start read from %s', url) | 46 log.info('start read from %s', url) |
55 self._eventSource = EventSource(url) | 47 self._eventSource = EventSource(url) |
56 self._eventSource.protocol.delimiter = '\n' | 48 self._eventSource.protocol.delimiter = '\n' |
57 | 49 |
58 self._eventSource.addEventListener('fullGraph', self._onFullGraph) | 50 self._eventSource.addEventListener('fullGraph', self._onFullGraph) |
63 g = ConjunctiveGraph() | 55 g = ConjunctiveGraph() |
64 g.parse(StringInputSource(message), format='json-ld') | 56 g.parse(StringInputSource(message), format='json-ld') |
65 p = Patch(addGraph=g) | 57 p = Patch(addGraph=g) |
66 self._sendPatch(p) | 58 self._sendPatch(p) |
67 except: | 59 except: |
68 traceback.print_exc() | 60 log.error(traceback.format_exc()) |
61 raise | |
69 | 62 |
70 def _onMessage(self, message): | 63 def _onMessage(self, message): |
71 try: | 64 try: |
72 p = patchFromJson(message) | 65 p = patchFromJson(message) |
73 self._sendPatch(p) | 66 self._sendPatch(p) |
74 except: | 67 except: |
75 traceback.print_exc() | 68 log.error(traceback.format_exc()) |
69 raise | |
76 | 70 |
77 def _sendPatch(self, p): | 71 def _sendPatch(self, p): |
78 log.info('output patch to %s listeners', p, len(self._listeners)) | 72 log.debug('PatchSource received patch %s', p.shortSummary()) |
79 for lis in self._listeners: | 73 for lis in self._listeners: |
80 lis(p) | 74 lis(p) |
81 | 75 |
82 def addPatchListener(self, func): | 76 def addPatchListener(self, func): |
83 self._listeners.add(func) | 77 self._listeners.add(func) |
84 | 78 |
85 def stop(self): | 79 def stop(self): |
86 log.info('stop read from %s', self.url) | 80 log.info('stop read from %s', self.url) |
87 self._eventSource.protocol.stopProducing() #? | 81 try: |
82 self._eventSource.protocol.stopProducing() #? | |
83 except AttributeError: | |
84 pass | |
88 self._eventSource = None | 85 self._eventSource = None |
89 | 86 |
90 def __del__(self): | 87 def __del__(self): |
91 if self._eventSource: | 88 if self._eventSource: |
92 raise ValueError | 89 raise ValueError |
93 | 90 |
94 class GraphClient(object): | 91 class GraphClient(object): |
95 """A listener of some EventSources that sends patches to one of our clients.""" | 92 """A listener of some PatchSources that emits patches to a cyclone SSEHandler.""" |
96 | 93 |
97 def __init__(self, handler, patchSources): | 94 def __init__(self, handler): |
98 self.handler = handler | 95 self.handler = handler |
99 | 96 |
100 for ps in patchSources: | 97 # The graph that the requester knows. |
101 ps.addPatchListener(self.onPatch) | 98 # |
102 | 99 # Note that often, 2 requests for the same streamId would have |
103 def onPatch(self, p): | 100 # the same graph contents in this attribute and ought to share |
104 self.handler.sendEvent(message=patchAsJson(p), event='patch') | 101 # it. But, that's a little harder to write, and if clients |
102 # want different throttling rates or have stalled different | |
103 # amounts, their currentGraph contents might drift apart | |
104 # temporarily. | |
105 self._currentGraph = PatchableGraph() | |
106 self._currentGraph.addObserver(self._sendPatch) | |
107 | |
108 def addPatchSource(self, ps): | |
109 """Connect this object to a PatchSource whose patches should get applied to our output graph""" | |
110 # this is never getting released, so we'll keep sending until | |
111 # no one wants the source anymore. | |
112 ps.addPatchListener(self._onPatch) | |
113 | |
114 def _onPatch(self, p): | |
115 self._currentGraph.patch(p) | |
116 | |
117 def _sendPatch(self, jsonPatch): | |
118 self.handler.sendEvent(message=jsonPatch, event='patch') | |
105 | 119 |
106 class GraphClients(object): | 120 class GraphClients(object): |
107 """All the active EventClient objects""" | 121 """ |
122 All the active GraphClient objects | |
123 | |
124 To handle all the overlapping-statement cases, we store a set of | |
125 true statements along with the sources that are currently | |
126 asserting them and the requesters who currently know them. As | |
127 statements come and go, we make patches to send to requesters. | |
128 | |
129 todo: reconnect patchsources that go down and deal with their graph diffs | |
130 """ | |
108 def __init__(self): | 131 def __init__(self): |
109 self.clients = {} # url: EventClient | 132 self.clients = {} # url: PatchSource |
110 self.listeners = {} # url: [GraphClient] | 133 self.handlers = set() # handler |
111 | 134 self.listeners = {} # url: [handler] (handler may appear under multiple urls) |
135 self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers)` | |
136 | |
112 def addSseHandler(self, handler, streamId): | 137 def addSseHandler(self, handler, streamId): |
113 log.info('addSseHandler %r %r', handler, streamId) | 138 log.info('addSseHandler %r %r', handler, streamId) |
114 matches = [s for s in config['streams'] if s['id'] == streamId] | 139 matches = [s for s in config['streams'] if s['id'] == streamId] |
115 if len(matches) != 1: | 140 if len(matches) != 1: |
116 raise ValueError("%s matches for %r" % (len(matches), streamId)) | 141 raise ValueError("%s matches for %r" % (len(matches), streamId)) |
117 ecs = [] | 142 |
143 self.handlers.add(handler) | |
118 for source in matches[0]['sources']: | 144 for source in matches[0]['sources']: |
119 if source not in self.clients: | 145 if source not in self.clients: |
120 self.clients[source] = PatchSource(source) | 146 ps = self.clients[source] = PatchSource(source) |
121 ecs.append(self.clients[source]) | 147 ps.addPatchListener(lambda p, source=source: self._onPatch(source, p)) |
148 self.listeners.setdefault(source, []).append(handler) | |
149 self._sendUpdatePatch(handler) | |
150 | |
151 def _onPatch(self, source, p): | |
152 | |
153 for stmt in p.addQuads: | |
154 sourceUrls, handlers = self.statements[stmt] | |
155 if source in sourceUrls: | |
156 raise ValueError("%s added stmt that it already had: %s" % (source, stmt)) | |
157 sourceUrls.add(source) | |
158 for stmt in p.delQuads: | |
159 sourceUrls, handlers = self.statements[stmt] | |
160 if source not in sourceUrls: | |
161 raise ValueError("%s deleting stmt that it didn't have: %s" % (source, stmt)) | |
162 sourceUrls.remove(source) | |
163 | |
164 for h in self.handlers: | |
165 self._sendUpdatePatch(h) | |
122 | 166 |
123 self.listeners.setdefault(source, []).append(GraphClient(handler, ecs)) | 167 def _sendUpdatePatch(self, handler): |
124 print self.__dict__ | 168 """send a patch event out this handler to bring it up to date with self.statements""" |
169 adds = [] | |
170 dels = [] | |
171 statementsToClear = [] | |
172 for stmt, (sources, handlers) in self.statements.iteritems(): | |
173 if sources and (handler not in handlers): | |
174 adds.append(stmt) | |
175 handlers.add(handler) | |
176 if not sources and (handler in handlers): | |
177 dels.append(stmt) | |
178 handlers.remove(handler) | |
179 statementsToClear.append(stmt) | |
180 # todo: cleanup statementsToClear | |
181 p = Patch(addQuads=adds, delQuads=dels) | |
182 if not p.isNoop(): | |
183 log.debug("send patch %s to %s", p.shortSummary(), handler) | |
184 handler.sendEvent(message=jsonFromPatch(p), event='patch') | |
125 | 185 |
126 def removeSseHandler(self, handler): | 186 def removeSseHandler(self, handler): |
127 log.info('removeSseHandler %r', handler) | 187 log.info('removeSseHandler %r', handler) |
128 for url, graphClients in self.listeners.items(): | 188 for url, handlers in self.listeners.items(): |
129 keep = [] | 189 keep = [] |
130 for gc in graphClients: | 190 for h in handlers: |
131 if gc.handler != handler: | 191 if h != handler: |
132 keep.append(gc) | 192 keep.append(h) |
133 graphClients[:] = keep | 193 handlers[:] = keep |
134 if not keep: | 194 if not keep: |
135 self.clients[url].stop() | 195 self.clients[url].stop() |
136 del self.clients[url] | 196 del self.clients[url] |
137 del self.listeners[url] | 197 del self.listeners[url] |
198 self.handlers.remove(handler) | |
138 | 199 |
139 class SomeGraph(cyclone.sse.SSEHandler): | 200 class SomeGraph(cyclone.sse.SSEHandler): |
140 def __init__(self, application, request): | 201 def __init__(self, application, request): |
141 cyclone.sse.SSEHandler.__init__(self, application, request) | 202 cyclone.sse.SSEHandler.__init__(self, application, request) |
142 self.id = request.uri[len('/graph/'):] | 203 self.id = request.uri[len('/graph/'):] |