comparison service/reasoning/sse_collector.py @ 1103:b84e956771fc

sse_collector now kind of gets concurrent requests right Ignore-this: e1a104d9ae81473b86fc12fbb8ac097b darcs-hash:1bc1655b532074d97d7b8b7dd65802a9c62b6ff9
author drewp <drewp@bigasterisk.com>
date Fri, 19 Aug 2016 22:37:01 -0700
parents 2c7fd4e246ed
children 5084a1f719c9
comparison
equal deleted inserted replaced
1102:06a511a96f20 1103:b84e956771fc
3 result of fetching multiple other SSE patch streams. The result stream 3 result of fetching multiple other SSE patch streams. The result stream
4 may include new statements injected by this service. 4 may include new statements injected by this service.
5 5
6 Future: 6 Future:
7 - filter out unneeded stmts from the sources 7 - filter out unneeded stmts from the sources
8 - give a time resolution and concatenate patches faster than that res 8 - give a time resolution and concatenate any patches that come faster than that res
9 """ 9 """
10 10
11 config = { 11 config = {
12 'streams': [ 12 'streams': [
13 {'id': 'home', 13 {'id': 'home',
20 } 20 }
21 21
22 from crochet import no_setup 22 from crochet import no_setup
23 no_setup() 23 no_setup()
24 24
25 import sys, logging, weakref, traceback, json 25 import sys, logging, traceback, json, collections
26 from twisted.internet import reactor 26 from twisted.internet import reactor
27 import cyclone.web, cyclone.sse 27 import cyclone.web, cyclone.sse
28 from rdflib import ConjunctiveGraph 28 from rdflib import ConjunctiveGraph
29 from rdflib.parser import StringInputSource 29 from rdflib.parser import StringInputSource
30 from docopt import docopt 30 from docopt import docopt
31 31
32 from twisted_sse_demo.eventsource import EventSource 32 from twisted_sse_demo.eventsource import EventSource
33 33
34 sys.path.append("../../lib") 34 sys.path.append("../../lib")
35 from logsetup import log 35 from logsetup import log
36 from patchablegraph import patchAsJson 36 from patchablegraph import jsonFromPatch, PatchableGraph, patchFromJson
37 37
38 sys.path.append("/my/proj/light9") 38 sys.path.append("/my/proj/light9")
39 from light9.rdfdb.patch import Patch 39 from light9.rdfdb.patch import Patch
40 40
41 def patchFromJson(j):
42 body = json.loads(j)['patch']
43 a = ConjunctiveGraph()
44 a.parse(StringInputSource(json.dumps(body['adds'])), format='json-ld')
45 d = ConjunctiveGraph()
46 d.parse(StringInputSource(json.dumps(body['deletes'])), format='json-ld')
47 return Patch(addGraph=a, delGraph=d)
48
49 class PatchSource(object): 41 class PatchSource(object):
50 """wrap EventSource so it emits Patch objects and has an explicit stop method""" 42 """wrap EventSource so it emits Patch objects and has an explicit stop method."""
51 def __init__(self, url): 43 def __init__(self, url):
52 self.url = url 44 self.url = url
53 self._listeners = set()#weakref.WeakSet() 45 self._listeners = set()
54 log.info('start read from %s', url) 46 log.info('start read from %s', url)
55 self._eventSource = EventSource(url) 47 self._eventSource = EventSource(url)
56 self._eventSource.protocol.delimiter = '\n' 48 self._eventSource.protocol.delimiter = '\n'
57 49
58 self._eventSource.addEventListener('fullGraph', self._onFullGraph) 50 self._eventSource.addEventListener('fullGraph', self._onFullGraph)
63 g = ConjunctiveGraph() 55 g = ConjunctiveGraph()
64 g.parse(StringInputSource(message), format='json-ld') 56 g.parse(StringInputSource(message), format='json-ld')
65 p = Patch(addGraph=g) 57 p = Patch(addGraph=g)
66 self._sendPatch(p) 58 self._sendPatch(p)
67 except: 59 except:
68 traceback.print_exc() 60 log.error(traceback.format_exc())
61 raise
69 62
70 def _onMessage(self, message): 63 def _onMessage(self, message):
71 try: 64 try:
72 p = patchFromJson(message) 65 p = patchFromJson(message)
73 self._sendPatch(p) 66 self._sendPatch(p)
74 except: 67 except:
75 traceback.print_exc() 68 log.error(traceback.format_exc())
69 raise
76 70
77 def _sendPatch(self, p): 71 def _sendPatch(self, p):
78 log.info('output patch to %s listeners', p, len(self._listeners)) 72 log.debug('PatchSource received patch %s', p.shortSummary())
79 for lis in self._listeners: 73 for lis in self._listeners:
80 lis(p) 74 lis(p)
81 75
82 def addPatchListener(self, func): 76 def addPatchListener(self, func):
83 self._listeners.add(func) 77 self._listeners.add(func)
84 78
85 def stop(self): 79 def stop(self):
86 log.info('stop read from %s', self.url) 80 log.info('stop read from %s', self.url)
87 self._eventSource.protocol.stopProducing() #? 81 try:
82 self._eventSource.protocol.stopProducing() #?
83 except AttributeError:
84 pass
88 self._eventSource = None 85 self._eventSource = None
89 86
90 def __del__(self): 87 def __del__(self):
91 if self._eventSource: 88 if self._eventSource:
92 raise ValueError 89 raise ValueError
93 90
94 class GraphClient(object): 91 class GraphClient(object):
95 """A listener of some EventSources that sends patches to one of our clients.""" 92 """A listener of some PatchSources that emits patches to a cyclone SSEHandler."""
96 93
97 def __init__(self, handler, patchSources): 94 def __init__(self, handler):
98 self.handler = handler 95 self.handler = handler
99 96
100 for ps in patchSources: 97 # The graph that the requester knows.
101 ps.addPatchListener(self.onPatch) 98 #
102 99 # Note that often, 2 requests for the same streamId would have
103 def onPatch(self, p): 100 # the same graph contents in this attribute and ought to share
104 self.handler.sendEvent(message=patchAsJson(p), event='patch') 101 # it. But, that's a little harder to write, and if clients
102 # want different throttling rates or have stalled different
103 # amounts, their currentGraph contents might drift apart
104 # temporarily.
105 self._currentGraph = PatchableGraph()
106 self._currentGraph.addObserver(self._sendPatch)
107
108 def addPatchSource(self, ps):
109 """Connect this object to a PatchSource whose patches should get applied to our output graph"""
110 # this is never getting released, so we'll keep sending until
111 # no one wants the source anymore.
112 ps.addPatchListener(self._onPatch)
113
114 def _onPatch(self, p):
115 self._currentGraph.patch(p)
116
117 def _sendPatch(self, jsonPatch):
118 self.handler.sendEvent(message=jsonPatch, event='patch')
105 119
106 class GraphClients(object): 120 class GraphClients(object):
107 """All the active EventClient objects""" 121 """
122 All the active GraphClient objects
123
124 To handle all the overlapping-statement cases, we store a set of
125 true statements along with the sources that are currently
126 asserting them and the requesters who currently know them. As
127 statements come and go, we make patches to send to requesters.
128
129 todo: reconnect patchsources that go down and deal with their graph diffs
130 """
108 def __init__(self): 131 def __init__(self):
109 self.clients = {} # url: EventClient 132 self.clients = {} # url: PatchSource
110 self.listeners = {} # url: [GraphClient] 133 self.handlers = set() # handler
111 134 self.listeners = {} # url: [handler] (handler may appear under multiple urls)
135 self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers)`
136
112 def addSseHandler(self, handler, streamId): 137 def addSseHandler(self, handler, streamId):
113 log.info('addSseHandler %r %r', handler, streamId) 138 log.info('addSseHandler %r %r', handler, streamId)
114 matches = [s for s in config['streams'] if s['id'] == streamId] 139 matches = [s for s in config['streams'] if s['id'] == streamId]
115 if len(matches) != 1: 140 if len(matches) != 1:
116 raise ValueError("%s matches for %r" % (len(matches), streamId)) 141 raise ValueError("%s matches for %r" % (len(matches), streamId))
117 ecs = [] 142
143 self.handlers.add(handler)
118 for source in matches[0]['sources']: 144 for source in matches[0]['sources']:
119 if source not in self.clients: 145 if source not in self.clients:
120 self.clients[source] = PatchSource(source) 146 ps = self.clients[source] = PatchSource(source)
121 ecs.append(self.clients[source]) 147 ps.addPatchListener(lambda p, source=source: self._onPatch(source, p))
148 self.listeners.setdefault(source, []).append(handler)
149 self._sendUpdatePatch(handler)
150
151 def _onPatch(self, source, p):
152
153 for stmt in p.addQuads:
154 sourceUrls, handlers = self.statements[stmt]
155 if source in sourceUrls:
156 raise ValueError("%s added stmt that it already had: %s" % (source, stmt))
157 sourceUrls.add(source)
158 for stmt in p.delQuads:
159 sourceUrls, handlers = self.statements[stmt]
160 if source not in sourceUrls:
161 raise ValueError("%s deleting stmt that it didn't have: %s" % (source, stmt))
162 sourceUrls.remove(source)
163
164 for h in self.handlers:
165 self._sendUpdatePatch(h)
122 166
123 self.listeners.setdefault(source, []).append(GraphClient(handler, ecs)) 167 def _sendUpdatePatch(self, handler):
124 print self.__dict__ 168 """send a patch event out this handler to bring it up to date with self.statements"""
169 adds = []
170 dels = []
171 statementsToClear = []
172 for stmt, (sources, handlers) in self.statements.iteritems():
173 if sources and (handler not in handlers):
174 adds.append(stmt)
175 handlers.add(handler)
176 if not sources and (handler in handlers):
177 dels.append(stmt)
178 handlers.remove(handler)
179 statementsToClear.append(stmt)
180 # todo: cleanup statementsToClear
181 p = Patch(addQuads=adds, delQuads=dels)
182 if not p.isNoop():
183 log.debug("send patch %s to %s", p.shortSummary(), handler)
184 handler.sendEvent(message=jsonFromPatch(p), event='patch')
125 185
126 def removeSseHandler(self, handler): 186 def removeSseHandler(self, handler):
127 log.info('removeSseHandler %r', handler) 187 log.info('removeSseHandler %r', handler)
128 for url, graphClients in self.listeners.items(): 188 for url, handlers in self.listeners.items():
129 keep = [] 189 keep = []
130 for gc in graphClients: 190 for h in handlers:
131 if gc.handler != handler: 191 if h != handler:
132 keep.append(gc) 192 keep.append(h)
133 graphClients[:] = keep 193 handlers[:] = keep
134 if not keep: 194 if not keep:
135 self.clients[url].stop() 195 self.clients[url].stop()
136 del self.clients[url] 196 del self.clients[url]
137 del self.listeners[url] 197 del self.listeners[url]
198 self.handlers.remove(handler)
138 199
139 class SomeGraph(cyclone.sse.SSEHandler): 200 class SomeGraph(cyclone.sse.SSEHandler):
140 def __init__(self, application, request): 201 def __init__(self, application, request):
141 cyclone.sse.SSEHandler.__init__(self, application, request) 202 cyclone.sse.SSEHandler.__init__(self, application, request)
142 self.id = request.uri[len('/graph/'):] 203 self.id = request.uri[len('/graph/'):]