changeset 1103:b84e956771fc

sse_collector now kind of gets concurrent requests right Ignore-this: e1a104d9ae81473b86fc12fbb8ac097b darcs-hash:1bc1655b532074d97d7b8b7dd65802a9c62b6ff9
author drewp <drewp@bigasterisk.com>
date Fri, 19 Aug 2016 22:37:01 -0700
parents 06a511a96f20
children 78f5fdec0c75
files lib/patchablegraph.py service/reasoning/sse_collector.py service/reasoning/twisted_sse_demo/eventsource.py
diffstat 3 files changed, 111 insertions(+), 39 deletions(-) [+]
line wrap: on
line diff
--- a/lib/patchablegraph.py	Fri Aug 19 10:59:39 2016 -0700
+++ b/lib/patchablegraph.py	Fri Aug 19 22:37:01 2016 -0700
@@ -26,7 +26,9 @@
 from light9.rdfdb.grapheditapi import GraphEditApi
 from rdflib import ConjunctiveGraph
 from light9.rdfdb.rdflibpatch import patchQuads
+from light9.rdfdb.patch import Patch
 from rdflib_jsonld.serializer import from_rdf
+from rdflib.parser import StringInputSource
 from cycloneerr import PrettyErrorHandler
 
 log = logging.getLogger('patchablegraph')
@@ -51,11 +53,21 @@
         #g.store.add((s,p,o), c) # no effect on nquad output
     return g
 
-def patchAsJson(p):
+def jsonFromPatch(p):
     return json.dumps({'patch': {
         'adds': from_rdf(_graphFromQuads2(p.addQuads)),
         'deletes': from_rdf(_graphFromQuads2(p.delQuads)),
     }})
+patchAsJson = jsonFromPatch # deprecated name
+
+    
+def patchFromJson(j):
+    body = json.loads(j)['patch']
+    a = ConjunctiveGraph()
+    a.parse(StringInputSource(json.dumps(body['adds'])), format='json-ld')
+    d = ConjunctiveGraph()
+    d.parse(StringInputSource(json.dumps(body['deletes'])), format='json-ld')
+    return Patch(addGraph=a, delGraph=d)
 
 def graphAsJson(g):
     # This is not the same as g.serialize(format='json-ld')! That
--- a/service/reasoning/sse_collector.py	Fri Aug 19 10:59:39 2016 -0700
+++ b/service/reasoning/sse_collector.py	Fri Aug 19 22:37:01 2016 -0700
@@ -5,7 +5,7 @@
 
 Future:
 - filter out unneeded stmts from the sources
-- give a time resolution and concatenate patches faster than that res
+- give a time resolution and concatenate any patches that come faster than that res
 """
 
 config = {
@@ -22,7 +22,7 @@
 from crochet import no_setup
 no_setup()
 
-import sys, logging, weakref, traceback, json
+import sys, logging, traceback, json, collections
 from twisted.internet import reactor
 import cyclone.web, cyclone.sse
 from rdflib import ConjunctiveGraph
@@ -33,24 +33,16 @@
 
 sys.path.append("../../lib")
 from logsetup import log
-from patchablegraph import patchAsJson
+from patchablegraph import jsonFromPatch, PatchableGraph, patchFromJson
 
 sys.path.append("/my/proj/light9")
 from light9.rdfdb.patch import Patch
 
-def patchFromJson(j):
-    body = json.loads(j)['patch']
-    a = ConjunctiveGraph()
-    a.parse(StringInputSource(json.dumps(body['adds'])), format='json-ld')
-    d = ConjunctiveGraph()
-    d.parse(StringInputSource(json.dumps(body['deletes'])), format='json-ld')
-    return Patch(addGraph=a, delGraph=d)
-
 class PatchSource(object):
-    """wrap EventSource so it emits Patch objects and has an explicit stop method"""
+    """wrap EventSource so it emits Patch objects and has an explicit stop method."""
     def __init__(self, url):
         self.url = url
-        self._listeners = set()#weakref.WeakSet()
+        self._listeners = set()
         log.info('start read from %s', url)
         self._eventSource = EventSource(url)
         self._eventSource.protocol.delimiter = '\n'
@@ -65,17 +57,19 @@
             p = Patch(addGraph=g)
             self._sendPatch(p)
         except:
-            traceback.print_exc()
+            log.error(traceback.format_exc())
+            raise
             
     def _onMessage(self, message):
         try:
             p = patchFromJson(message)
             self._sendPatch(p)
         except:
-            traceback.print_exc()
+            log.error(traceback.format_exc())
+            raise
 
     def _sendPatch(self, p):
-        log.info('output patch to %s listeners', p, len(self._listeners))
+        log.debug('PatchSource received patch %s', p.shortSummary())
         for lis in self._listeners:
             lis(p)
         
@@ -84,7 +78,10 @@
 
     def stop(self):
         log.info('stop read from %s', self.url)
-        self._eventSource.protocol.stopProducing() #?
+        try:
+            self._eventSource.protocol.stopProducing() #?
+        except AttributeError:
+            pass
         self._eventSource = None
 
     def __del__(self):
@@ -92,49 +89,113 @@
             raise ValueError
 
 class GraphClient(object):
-    """A listener of some EventSources that sends patches to one of our clients."""
+    """A listener of some PatchSources that emits patches to a cyclone SSEHandler."""
 
-    def __init__(self, handler, patchSources):
+    def __init__(self, handler):
         self.handler = handler
 
-        for ps in patchSources:
-            ps.addPatchListener(self.onPatch)
+        # The graph that the requester knows.
+        # 
+        # Note that often, 2 requests for the same streamId would have
+        # the same graph contents in this attribute and ought to share
+        # it. But, that's a little harder to write, and if clients
+        # want different throttling rates or have stalled different
+        # amounts, their currentGraph contents might drift apart
+        # temporarily.
+        self._currentGraph = PatchableGraph()
+        self._currentGraph.addObserver(self._sendPatch)
 
-    def onPatch(self, p):
-        self.handler.sendEvent(message=patchAsJson(p), event='patch')
+    def addPatchSource(self, ps):
+        """Connect this object to a PatchSource whose patches should get applied to our output graph"""
+        # this is never getting released, so we'll keep sending until
+        # no one wants the source anymore.
+        ps.addPatchListener(self._onPatch)
+
+    def _onPatch(self, p):
+        self._currentGraph.patch(p)
+        
+    def _sendPatch(self, jsonPatch):
+        self.handler.sendEvent(message=jsonPatch, event='patch')
     
 class GraphClients(object):
-    """All the active EventClient objects"""
+    """
+    All the active GraphClient objects
+
+    To handle all the overlapping-statement cases, we store a set of
+    true statements along with the sources that are currently
+    asserting them and the requesters who currently know them. As
+    statements come and go, we make patches to send to requesters.
+    
+    todo: reconnect patchsources that go down and deal with their graph diffs
+    """
     def __init__(self):
-        self.clients = {}  # url: EventClient
-        self.listeners = {}  # url: [GraphClient]
-
+        self.clients = {}  # url: PatchSource
+        self.handlers = set()  # handler
+        self.listeners = {}  # url: [handler]  (handler may appear under multiple urls)
+        self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers)`
+        
     def addSseHandler(self, handler, streamId):
         log.info('addSseHandler %r %r', handler, streamId)
         matches = [s for s in config['streams'] if s['id'] == streamId]
         if len(matches) != 1:
             raise ValueError("%s matches for %r" % (len(matches), streamId))
-        ecs = []
+
+        self.handlers.add(handler)
         for source in matches[0]['sources']:
             if source not in self.clients:
-                self.clients[source] = PatchSource(source)
-            ecs.append(self.clients[source])
+                ps = self.clients[source] = PatchSource(source)
+                ps.addPatchListener(lambda p, source=source: self._onPatch(source, p))
+            self.listeners.setdefault(source, []).append(handler)
+        self._sendUpdatePatch(handler)
+
+    def _onPatch(self, source, p):
+
+        for stmt in p.addQuads:
+            sourceUrls, handlers = self.statements[stmt]
+            if source in sourceUrls:
+                raise ValueError("%s added stmt that it already had: %s" % (source, stmt))
+            sourceUrls.add(source)
+        for stmt in p.delQuads:
+            sourceUrls, handlers = self.statements[stmt]
+            if source not in sourceUrls:
+                raise ValueError("%s deleting stmt that it didn't have: %s" % (source, stmt))
+            sourceUrls.remove(source)
+
+        for h in self.handlers:
+            self._sendUpdatePatch(h)
             
-        self.listeners.setdefault(source, []).append(GraphClient(handler, ecs))
-        print self.__dict__
+    def _sendUpdatePatch(self, handler):
+        """send a patch event out this handler to bring it up to date with self.statements"""
+        adds = []
+        dels = []
+        statementsToClear = []
+        for stmt, (sources, handlers) in self.statements.iteritems():
+            if sources and (handler not in handlers):
+                adds.append(stmt)
+                handlers.add(handler)
+            if not sources and (handler in handlers):
+                dels.append(stmt)
+                handlers.remove(handler)
+                statementsToClear.append(stmt)
+        # todo: cleanup statementsToClear
+        p = Patch(addQuads=adds, delQuads=dels)
+        if not p.isNoop():
+            log.debug("send patch %s to %s", p.shortSummary(), handler)
+            handler.sendEvent(message=jsonFromPatch(p), event='patch')
         
     def removeSseHandler(self, handler):
         log.info('removeSseHandler %r', handler)
-        for url, graphClients in self.listeners.items():
+        for url, handlers in self.listeners.items():
             keep = []
-            for gc in graphClients:
-                if gc.handler != handler:
-                    keep.append(gc)
-            graphClients[:] = keep
+            for h in handlers:
+                if h != handler:
+                    keep.append(h)
+            handlers[:] = keep
             if not keep:
                 self.clients[url].stop()
                 del self.clients[url]
                 del self.listeners[url]
+        self.handlers.remove(handler)
     
 class SomeGraph(cyclone.sse.SSEHandler):
     def __init__(self, application, request):
--- a/service/reasoning/twisted_sse_demo/eventsource.py	Fri Aug 19 10:59:39 2016 -0700
+++ b/service/reasoning/twisted_sse_demo/eventsource.py	Fri Aug 19 22:37:01 2016 -0700
@@ -39,7 +39,6 @@
         d.addCallback(self.cbRequest)
 
     def cbRequest(self, response):
-        print 'cbRequest', response.code
         if response.code != 200:
             self.callErrorHandler("non 200 response received: %d" %
                                   response.code)