296
|
1 """
|
|
2 requesting /graph/foo returns an SSE patch stream that's the
|
|
3 result of fetching multiple other SSE patch streams. The result stream
|
|
4 may include new statements injected by this service.
|
|
5
|
|
6 Future:
|
|
7 - filter out unneeded stmts from the sources
|
|
8 - give a time resolution and concatenate patches faster than that res
|
|
9 """
|
|
10
|
|
11 config = {
|
|
12 'streams': [
|
|
13 {'id': 'home',
|
|
14 'sources': [
|
|
15 #'http://bang:9059/graph/events',
|
|
16 'http://plus:9075/graph/events',
|
|
17 ]
|
|
18 },
|
|
19 ]
|
|
20 }
|
|
21
|
|
22 from crochet import no_setup
|
|
23 no_setup()
|
|
24
|
|
25 import sys, logging, weakref, traceback, json
|
|
26 from twisted.internet import reactor
|
|
27 import cyclone.web, cyclone.sse
|
|
28 from rdflib import ConjunctiveGraph
|
|
29 from rdflib.parser import StringInputSource
|
|
30 from docopt import docopt
|
|
31
|
|
32 from twisted_sse_demo.eventsource import EventSource
|
|
33
|
|
34 sys.path.append("../../lib")
|
|
35 from logsetup import log
|
|
36 from patchablegraph import patchAsJson
|
|
37
|
|
38 sys.path.append("/my/proj/light9")
|
|
39 from light9.rdfdb.patch import Patch
|
|
40
|
|
41 def patchFromJson(j):
|
|
42 body = json.loads(j)['patch']
|
|
43 a = ConjunctiveGraph()
|
|
44 a.parse(StringInputSource(json.dumps(body['adds'])), format='json-ld')
|
|
45 d = ConjunctiveGraph()
|
|
46 d.parse(StringInputSource(json.dumps(body['deletes'])), format='json-ld')
|
|
47 return Patch(addGraph=a, delGraph=d)
|
|
48
|
|
49 class PatchSource(object):
|
|
50 """wrap EventSource so it emits Patch objects and has an explicit stop method"""
|
|
51 def __init__(self, url):
|
|
52 self.url = url
|
|
53 self._listeners = set()#weakref.WeakSet()
|
|
54 log.info('start read from %s', url)
|
|
55 self._eventSource = EventSource(url)
|
|
56 self._eventSource.protocol.delimiter = '\n'
|
|
57
|
|
58 self._eventSource.addEventListener('fullGraph', self._onFullGraph)
|
|
59 self._eventSource.addEventListener('patch', self._onMessage)
|
|
60
|
|
61 def _onFullGraph(self, message):
|
|
62 try:
|
|
63 g = ConjunctiveGraph()
|
|
64 g.parse(StringInputSource(message), format='json-ld')
|
|
65 p = Patch(addGraph=g)
|
|
66 self._sendPatch(p)
|
|
67 except:
|
|
68 traceback.print_exc()
|
|
69
|
|
70 def _onMessage(self, message):
|
|
71 try:
|
|
72 p = patchFromJson(message)
|
|
73 self._sendPatch(p)
|
|
74 except:
|
|
75 traceback.print_exc()
|
|
76
|
|
77 def _sendPatch(self, p):
|
|
78 log.info('output patch to %s listeners', p, len(self._listeners))
|
|
79 for lis in self._listeners:
|
|
80 lis(p)
|
|
81
|
|
82 def addPatchListener(self, func):
|
|
83 self._listeners.add(func)
|
|
84
|
|
85 def stop(self):
|
|
86 log.info('stop read from %s', self.url)
|
|
87 self._eventSource.protocol.stopProducing() #?
|
|
88 self._eventSource = None
|
|
89
|
|
90 def __del__(self):
|
|
91 if self._eventSource:
|
|
92 raise ValueError
|
|
93
|
|
94 class GraphClient(object):
|
|
95 """A listener of some EventSources that sends patches to one of our clients."""
|
|
96
|
|
97 def __init__(self, handler, patchSources):
|
|
98 self.handler = handler
|
|
99
|
|
100 for ps in patchSources:
|
|
101 ps.addPatchListener(self.onPatch)
|
|
102
|
|
103 def onPatch(self, p):
|
|
104 self.handler.sendEvent(message=patchAsJson(p), event='patch')
|
|
105
|
|
106 class GraphClients(object):
|
|
107 """All the active EventClient objects"""
|
|
108 def __init__(self):
|
|
109 self.clients = {} # url: EventClient
|
|
110 self.listeners = {} # url: [GraphClient]
|
|
111
|
|
112 def addSseHandler(self, handler, streamId):
|
|
113 log.info('addSseHandler %r %r', handler, streamId)
|
|
114 matches = [s for s in config['streams'] if s['id'] == streamId]
|
|
115 if len(matches) != 1:
|
|
116 raise ValueError("%s matches for %r" % (len(matches), streamId))
|
|
117 ecs = []
|
|
118 for source in matches[0]['sources']:
|
|
119 if source not in self.clients:
|
|
120 self.clients[source] = PatchSource(source)
|
|
121 ecs.append(self.clients[source])
|
|
122
|
|
123 self.listeners.setdefault(source, []).append(GraphClient(handler, ecs))
|
|
124 print self.__dict__
|
|
125
|
|
126 def removeSseHandler(self, handler):
|
|
127 log.info('removeSseHandler %r', handler)
|
|
128 for url, graphClients in self.listeners.items():
|
|
129 keep = []
|
|
130 for gc in graphClients:
|
|
131 if gc.handler != handler:
|
|
132 keep.append(gc)
|
|
133 graphClients[:] = keep
|
|
134 if not keep:
|
|
135 self.clients[url].stop()
|
|
136 del self.clients[url]
|
|
137 del self.listeners[url]
|
|
138
|
|
139 class SomeGraph(cyclone.sse.SSEHandler):
|
|
140 def __init__(self, application, request):
|
|
141 cyclone.sse.SSEHandler.__init__(self, application, request)
|
|
142 self.id = request.uri[len('/graph/'):]
|
|
143 self.graphClients = self.settings.graphClients
|
|
144
|
|
145 def bind(self):
|
|
146 self.graphClients.addSseHandler(self, self.id)
|
|
147
|
|
148 def unbind(self):
|
|
149 self.graphClients.removeSseHandler(self)
|
|
150
|
|
151 if __name__ == '__main__':
|
|
152
|
|
153 arg = docopt("""
|
|
154 Usage: sse_collector.py [options]
|
|
155
|
|
156 -v Verbose
|
|
157 """)
|
|
158
|
|
159 if arg['-v']:
|
|
160 import twisted.python.log
|
|
161 twisted.python.log.startLogging(sys.stdout)
|
|
162 log.setLevel(logging.DEBUG)
|
|
163
|
|
164
|
|
165 graphClients = GraphClients()
|
|
166
|
|
167 reactor.listenTCP(
|
|
168 9071,
|
|
169 cyclone.web.Application(
|
|
170 handlers=[
|
|
171 (r'/graph/(.*)', SomeGraph),
|
|
172 ],
|
|
173 graphClients=graphClients),
|
|
174 interface='::')
|
|
175 reactor.run()
|