changeset 300:371af6e92b5e

local state statements and self.statements rewrite Ignore-this: 731cac6010e37305053061f637c8729f
author drewp@bigasterisk.com
date Sat, 20 Aug 2016 23:34:04 -0700
parents 5084a1f719c9
children 29f593aee67b
files service/reasoning/sse_collector.py
diffstat 1 files changed, 145 insertions(+), 72 deletions(-) [+]
line wrap: on
line diff
--- a/service/reasoning/sse_collector.py	Fri Aug 19 22:46:33 2016 -0700
+++ b/service/reasoning/sse_collector.py	Sat Aug 20 23:34:04 2016 -0700
@@ -25,7 +25,7 @@
 import sys, logging, traceback, json, collections
 from twisted.internet import reactor
 import cyclone.web, cyclone.sse
-from rdflib import ConjunctiveGraph
+from rdflib import ConjunctiveGraph, URIRef, Namespace
 from rdflib.parser import StringInputSource
 from docopt import docopt
 
@@ -38,13 +38,19 @@
 sys.path.append("/my/proj/light9")
 from light9.rdfdb.patch import Patch
 
+ROOM = Namespace("http://projects.bigasterisk.com/room/")
+COLLECTOR = URIRef('http://bigasterisk.com/sse_collector/')
+
+class ConnectionLost(object):
+    pass
+
 class PatchSource(object):
     """wrap EventSource so it emits Patch objects and has an explicit stop method."""
     def __init__(self, url):
         self.url = url
         self._listeners = set()
         log.info('start read from %s', url)
-        self._eventSource = EventSource(url)
+        self._eventSource = EventSource(url.toPython().encode('utf8'))
         self._eventSource.protocol.delimiter = '\n'
 
         self._eventSource.addEventListener('fullGraph', self._onFullGraph)
@@ -55,7 +61,7 @@
             g = ConjunctiveGraph()
             g.parse(StringInputSource(message), format='json-ld')
             p = Patch(addGraph=g)
-            self._sendPatch(p)
+            self._sendPatch(p, fullGraph=True)
         except:
             log.error(traceback.format_exc())
             raise
@@ -63,23 +69,26 @@
     def _onMessage(self, message):
         try:
             p = patchFromJson(message)
-            self._sendPatch(p)
+            self._sendPatch(p, fullGraph=False)
         except:
             log.error(traceback.format_exc())
             raise
 
-    def _sendPatch(self, p):
+    def _sendPatch(self, p, fullGraph):
         log.debug('PatchSource received patch %s', p.shortSummary())
         for lis in self._listeners:
-            lis(p)
+            lis(p, fullGraph=fullGraph)
         
     def addPatchListener(self, func):
+        """
+        func(patch or ConnectionLost, fullGraph=[true if the patch is the initial fullgraph])
+        """
         self._listeners.add(func)
 
     def stop(self):
         log.info('stop read from %s', self.url)
         try:
-            self._eventSource.protocol.stopProducing() #?
+            self._eventSource.protocol.stopProducing() # needed?
         except AttributeError:
             pass
         self._eventSource = None
@@ -88,35 +97,42 @@
         if self._eventSource:
             raise ValueError
 
-class GraphClient(object):
-    """A listener of some PatchSources that emits patches to a cyclone SSEHandler."""
-
-    def __init__(self, handler):
-        self.handler = handler
-
-        # The graph that the requester knows.
-        # 
-        # Note that often, 2 requests for the same streamId would have
-        # the same graph contents in this attribute and ought to share
-        # it. But, that's a little harder to write, and if clients
-        # want different throttling rates or have stalled different
-        # amounts, their currentGraph contents might drift apart
-        # temporarily.
-        self._currentGraph = PatchableGraph()
-        self._currentGraph.addObserver(self._sendPatch)
-
-    def addPatchSource(self, ps):
-        """Connect this object to a PatchSource whose patches should get applied to our output graph"""
-        # this is never getting released, so we'll keep sending until
-        # no one wants the source anymore.
-        ps.addPatchListener(self._onPatch)
-
-    def _onPatch(self, p):
-        self._currentGraph.patch(p)
+class LocalStatements(object):
+    def __init__(self, applyPatch):
+        self.applyPatch = applyPatch
+        self._sourceState = {} # source: state URIRef
         
-    def _sendPatch(self, jsonPatch):
-        self.handler.sendEvent(message=jsonPatch, event='patch')
-    
+    def setSourceState(self, source, state):
+        """
+        add a patch to the COLLECTOR graph about the state of this
+        source. state=None to remove the source.
+        """
+        oldState = self._sourceState.get(source, None)
+        if state == oldState:
+            return
+        log.info('source state %s -> %s', source, state)
+        if oldState is None:
+            self._sourceState[source] = state
+            self.applyPatch(COLLECTOR, Patch(addQuads=[
+                (COLLECTOR, ROOM['source'], source, COLLECTOR),
+                (source, ROOM['state'], state, COLLECTOR),
+            ]))
+        elif state is None:
+            del self._sourceState[source]
+            self.applyPatch(COLLECTOR, Patch(delQuads=[
+                (COLLECTOR, ROOM['source'], source, COLLECTOR),
+                (source, ROOM['state'], oldState, COLLECTOR),
+            ]))
+        else:
+            self._sourceState[source] = state
+            self.applyPatch(COLLECTOR, Patch(
+                addQuads=[
+                (source, ROOM['state'], state, COLLECTOR),
+                ],
+                delQuads=[
+                    (source, ROOM['state'], oldState, COLLECTOR),
+                ]))
+            
 class GraphClients(object):
     """
     All the active GraphClient objects
@@ -132,24 +148,55 @@
         self.clients = {}  # url: PatchSource
         self.handlers = set()  # handler
         self.listeners = {}  # url: [handler]  (handler may appear under multiple urls)
+
+        # This table holds statements asserted by any of our sources
+        # plus local statements that we introduce (source is
+        # http://bigasterisk.com/sse_collector/).
         self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers)`
-        
-    def addSseHandler(self, handler, streamId):
-        log.info('addSseHandler %r %r', handler, streamId)
-        matches = [s for s in config['streams'] if s['id'] == streamId]
-        if len(matches) != 1:
-            raise ValueError("%s matches for %r" % (len(matches), streamId))
+
+        self._localStatements = LocalStatements(self._onPatch)
 
-        self.handlers.add(handler)
-        for source in matches[0]['sources']:
-            if source not in self.clients:
-                ps = self.clients[source] = PatchSource(source)
-                ps.addPatchListener(lambda p, source=source: self._onPatch(source, p))
-            self.listeners.setdefault(source, []).append(handler)
-        self._sendUpdatePatch(handler)
+    def _pprintTable(self):
+        for i, (stmt, (sources, handlers)) in enumerate(sorted(self.statements.items())):
+            print "%03d. (%s, %s, %s, %s) from %s to %s" % (
+                i,
+                stmt[0].n3(),
+                stmt[1].n3(),
+                stmt[2].n3(),
+                stmt[3].n3(),
+                ','.join(s.n3() for s in sources),
+                handlers)        
+            
+    def _sendUpdatePatch(self, handler):
+        """send a patch event out this handler to bring it up to date with self.statements"""
+        p = self._makeSyncPatch(handler)
+        if not p.isNoop():
+            log.debug("send patch %s to %s", p.shortSummary(), handler)
+            handler.sendEvent(message=jsonFromPatch(p), event='patch')
 
-    def _onPatch(self, source, p):
+    def _makeSyncPatch(self, handler):
+        # todo: this could run all handlers at once, which is how we use it anyway
+        adds = []
+        dels = []
+        statementsToClear = []
+        for stmt, (sources, handlers) in self.statements.iteritems():
+            relevantToHandler = handler in sum((self.listeners.get(s, []) for s in sources), [])
+            handlerHasIt = handler in handlers
+            if relevantToHandler and not handlerHasIt:
+                adds.append(stmt)
+                handlers.add(handler)
+            elif not relevantToHandler and handlerHasIt:
+                dels.append(stmt)
+                handlers.remove(handler)
+                if not handlers:
+                    statementsToClear.append(stmt)
+                    
+        for stmt in statementsToClear:
+            del self.statements[stmt]
 
+        return Patch(addQuads=adds, delQuads=dels)
+        
+    def _onPatch(self, source, p, fullGraph=False):
         for stmt in p.addQuads:
             sourceUrls, handlers = self.statements[stmt]
             if source in sourceUrls:
@@ -163,28 +210,43 @@
 
         for h in self.handlers:
             self._sendUpdatePatch(h)
-            
-    def _sendUpdatePatch(self, handler):
-        """send a patch event out this handler to bring it up to date with self.statements"""
-        adds = []
-        dels = []
-        statementsToClear = []
-        for stmt, (sources, handlers) in self.statements.iteritems():
-            if sources and (handler not in handlers):
-                adds.append(stmt)
-                handlers.add(handler)
-            if not sources and (handler in handlers):
-                dels.append(stmt)
-                handlers.remove(handler)
-                statementsToClear.append(stmt)
-        # todo: cleanup statementsToClear
-        p = Patch(addQuads=adds, delQuads=dels)
-        if not p.isNoop():
-            log.debug("send patch %s to %s", p.shortSummary(), handler)
-            handler.sendEvent(message=jsonFromPatch(p), event='patch')
+
+        if log.isEnabledFor(logging.DEBUG):
+            self._pprintTable()
+
+        if source != COLLECTOR:
+            if fullGraph:
+                self._localStatements.setSourceState(source, ROOM['fullGraphReceived'])
+            else:
+                self._localStatements.setSourceState(source, ROOM['patchesReceived'])
+        
+    def addSseHandler(self, handler, streamId):
+        log.info('addSseHandler %r %r', handler, streamId)
+        matches = [s for s in config['streams'] if s['id'] == streamId]
+        if len(matches) != 1:
+            raise ValueError("%s matches for %r" % (len(matches), streamId))
+
+        self.handlers.add(handler)
+        for source in map(URIRef, matches[0]['sources']):
+            if source not in self.clients:
+                self._localStatements.setSourceState(source, ROOM['connect'])
+                ps = self.clients[source] = PatchSource(source)
+                ps.addPatchListener(
+                    lambda p, fullGraph, source=source: self._onPatch(source, p, fullGraph))
+            self.listeners.setdefault(source, []).append(handler)
+        self._sendUpdatePatch(handler)
         
     def removeSseHandler(self, handler):
         log.info('removeSseHandler %r', handler)
+        
+        statementsToClear = []
+        for stmt, (sources, handlers) in self.statements.iteritems():
+            handlers.discard(handler)
+            if not sources and not handlers:
+                statementsToClear.append(stmt)
+        for stmt in statementsToClear:
+            del self.statements[stmt]
+                
         for url, handlers in self.listeners.items():
             keep = []
             for h in handlers:
@@ -192,11 +254,22 @@
                     keep.append(h)
             handlers[:] = keep
             if not keep:
-                self.clients[url].stop()
-                del self.clients[url]
-                del self.listeners[url]
+                self._stopClient(url)
         self.handlers.remove(handler)
-    
+
+    def _stopClient(self, url):
+        self.clients[url].stop()
+
+        for stmt, (sources, handlers) in self.statements.iteritems():
+            sources.discard(url)
+        
+        self._localStatements.setSourceState(url, None)
+        del self.clients[url]
+        del self.listeners[url]
+
+        
+        
+        
 class SomeGraph(cyclone.sse.SSEHandler):
     def __init__(self, application, request):
         cyclone.sse.SSEHandler.__init__(self, application, request)