changeset 296:233b81cf2712

start sse_collector Ignore-this: eba53ef3b8b7b34089e018595c41d202
author drewp@bigasterisk.com
date Fri, 19 Aug 2016 10:54:38 -0700
parents d25ac47f4820
children 6cde6131f2c0
files service/reasoning/requirements.txt service/reasoning/sse_collector.py
diffstat 2 files changed, 176 insertions(+), 2 deletions(-) [+]
line wrap: on
line diff
--- a/service/reasoning/requirements.txt	Fri Aug 19 10:53:36 2016 -0700
+++ b/service/reasoning/requirements.txt	Fri Aug 19 10:54:38 2016 -0700
@@ -2,5 +2,4 @@
 ipdb==0.7
 docopt==0.6.2
 rdflib==4.2.1
-git+http://github.com/drewp/FuXi.git@003fb48984e9813808a23ba152798c125718f0e7#egg=FuXi
-git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93#egg=scales
+git+http://github.com/drewp/FuXi.git@22263b0751a29839013ce43646dd18302c7b9bb1#egg=FuXi
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/reasoning/sse_collector.py	Fri Aug 19 10:54:38 2016 -0700
@@ -0,0 +1,175 @@
+"""
+requesting /graph/foo returns an SSE patch stream that's the
+result of fetching multiple other SSE patch streams. The result stream
+may include new statements injected by this service.
+
+Future:
+- filter out unneeded stmts from the sources
+- give a time resolution and concatenate patches faster than that res
+"""
+
+config = {
+    'streams': [
+        {'id': 'home',
+         'sources': [
+             #'http://bang:9059/graph/events',
+             'http://plus:9075/graph/events',     
+         ]
+     },
+    ]
+}
+
+from crochet import no_setup
+no_setup()
+
+import sys, logging, weakref, traceback, json
+from twisted.internet import reactor
+import cyclone.web, cyclone.sse
+from rdflib import ConjunctiveGraph
+from rdflib.parser import StringInputSource
+from docopt import docopt
+
+from twisted_sse_demo.eventsource import EventSource
+
+sys.path.append("../../lib")
+from logsetup import log
+from patchablegraph import patchAsJson
+
+sys.path.append("/my/proj/light9")
+from light9.rdfdb.patch import Patch
+
+def patchFromJson(j):
+    body = json.loads(j)['patch']
+    a = ConjunctiveGraph()
+    a.parse(StringInputSource(json.dumps(body['adds'])), format='json-ld')
+    d = ConjunctiveGraph()
+    d.parse(StringInputSource(json.dumps(body['deletes'])), format='json-ld')
+    return Patch(addGraph=a, delGraph=d)
+
+class PatchSource(object):
+    """wrap EventSource so it emits Patch objects and has an explicit stop method"""
+    def __init__(self, url):
+        self.url = url
+        self._listeners = set()#weakref.WeakSet()
+        log.info('start read from %s', url)
+        self._eventSource = EventSource(url)
+        self._eventSource.protocol.delimiter = '\n'
+
+        self._eventSource.addEventListener('fullGraph', self._onFullGraph)
+        self._eventSource.addEventListener('patch', self._onMessage)
+
+    def _onFullGraph(self, message):
+        try:
+            g = ConjunctiveGraph()
+            g.parse(StringInputSource(message), format='json-ld')
+            p = Patch(addGraph=g)
+            self._sendPatch(p)
+        except:
+            traceback.print_exc()
+            
+    def _onMessage(self, message):
+        try:
+            p = patchFromJson(message)
+            self._sendPatch(p)
+        except:
+            traceback.print_exc()
+
+    def _sendPatch(self, p):
+        log.info('output patch to %s listeners', p, len(self._listeners))
+        for lis in self._listeners:
+            lis(p)
+        
+    def addPatchListener(self, func):
+        self._listeners.add(func)
+
+    def stop(self):
+        log.info('stop read from %s', self.url)
+        self._eventSource.protocol.stopProducing() #?
+        self._eventSource = None
+
+    def __del__(self):
+        if self._eventSource:
+            raise ValueError
+
+class GraphClient(object):
+    """A listener of some EventSources that sends patches to one of our clients."""
+
+    def __init__(self, handler, patchSources):
+        self.handler = handler
+
+        for ps in patchSources:
+            ps.addPatchListener(self.onPatch)
+
+    def onPatch(self, p):
+        self.handler.sendEvent(message=patchAsJson(p), event='patch')
+    
+class GraphClients(object):
+    """All the active EventClient objects"""
+    def __init__(self):
+        self.clients = {}  # url: EventClient
+        self.listeners = {}  # url: [GraphClient]
+
+    def addSseHandler(self, handler, streamId):
+        log.info('addSseHandler %r %r', handler, streamId)
+        matches = [s for s in config['streams'] if s['id'] == streamId]
+        if len(matches) != 1:
+            raise ValueError("%s matches for %r" % (len(matches), streamId))
+        ecs = []
+        for source in matches[0]['sources']:
+            if source not in self.clients:
+                self.clients[source] = PatchSource(source)
+            ecs.append(self.clients[source])
+            
+        self.listeners.setdefault(source, []).append(GraphClient(handler, ecs))
+        print self.__dict__
+        
+    def removeSseHandler(self, handler):
+        log.info('removeSseHandler %r', handler)
+        for url, graphClients in self.listeners.items():
+            keep = []
+            for gc in graphClients:
+                if gc.handler != handler:
+                    keep.append(gc)
+            graphClients[:] = keep
+            if not keep:
+                self.clients[url].stop()
+                del self.clients[url]
+                del self.listeners[url]
+    
+class SomeGraph(cyclone.sse.SSEHandler):
+    def __init__(self, application, request):
+        cyclone.sse.SSEHandler.__init__(self, application, request)
+        self.id = request.uri[len('/graph/'):]
+        self.graphClients = self.settings.graphClients
+        
+    def bind(self):
+        self.graphClients.addSseHandler(self, self.id)
+        
+    def unbind(self):
+        self.graphClients.removeSseHandler(self)
+
+if __name__ == '__main__':
+
+    arg = docopt("""
+    Usage: sse_collector.py [options]
+
+    -v   Verbose
+    """)
+    
+    if arg['-v']:
+        import twisted.python.log
+        twisted.python.log.startLogging(sys.stdout)
+        log.setLevel(logging.DEBUG)
+
+
+    graphClients = GraphClients()
+        
+    reactor.listenTCP(
+        9071,
+        cyclone.web.Application(
+            handlers=[
+                (r'/graph/(.*)', SomeGraph),
+            ],
+            graphClients=graphClients),
+        interface='::')
+    reactor.run()