changeset 1156:ee168d55524a

reasoning & collector move into docker images Ignore-this: 67e97d307eba96791cbe77e57c57ad57 darcs-hash:47056d579a870b473e95f4eb7897aae0a97c03cc
author drewp <drewp@bigasterisk.com>
date Mon, 03 Sep 2018 00:45:34 -0700
parents 3d478b05f9b1
children 6d67b23f3a75
files lib/patchsource.py lib/twisted_sse_demo/README.md lib/twisted_sse_demo/__init__.py lib/twisted_sse_demo/api_example.py lib/twisted_sse_demo/eventsource.py lib/twisted_sse_demo/requirements.txt lib/twisted_sse_demo/sse_client.py lib/twisted_sse_demo/sse_server.py service/collector/Dockerfile service/collector/makefile service/collector/requirements.txt service/collector/sse_collector.py service/reasoning/Dockerfile service/reasoning/inputgraph.py service/reasoning/makefile service/reasoning/patchsource.py service/reasoning/reasoning.py service/reasoning/requirements.txt service/reasoning/sse_collector.py service/reasoning/twisted_sse_demo/README.md service/reasoning/twisted_sse_demo/__init__.py service/reasoning/twisted_sse_demo/api_example.py service/reasoning/twisted_sse_demo/eventsource.py service/reasoning/twisted_sse_demo/requirements.txt service/reasoning/twisted_sse_demo/sse_client.py service/reasoning/twisted_sse_demo/sse_server.py
diffstat 26 files changed, 879 insertions(+), 822 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/patchsource.py	Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,143 @@
+import sys, logging
+import traceback
+from twisted.internet import reactor, defer
+from twisted_sse_demo.eventsource import EventSource
+from rdflib import ConjunctiveGraph
+from rdflib.parser import StringInputSource
+
+sys.path.append("../../lib")
+from patchablegraph import patchFromJson
+
+sys.path.append("/my/proj/rdfdb")
+from rdfdb.patch import Patch
+
+log = logging.getLogger('fetch')
+
+class PatchSource(object):
+    """wrap EventSource so it emits Patch objects and has an explicit stop method."""
+    def __init__(self, url):
+        self.url = url
+
+        # add callbacks to these to learn if we failed to connect
+        # (approximately) or if the ccnnection was unexpectedly lost
+        self.connectionFailed = defer.Deferred()
+        self.connectionLost = defer.Deferred()
+        
+        self._listeners = set()
+        log.info('start read from %s', url)
+        # note: fullGraphReceived isn't guaranteed- the stream could
+        # start with patches
+        self._fullGraphReceived = False
+        self._eventSource = EventSource(url.toPython().encode('utf8'))
+        self._eventSource.protocol.delimiter = '\n'
+
+        self._eventSource.addEventListener('fullGraph', self._onFullGraph)
+        self._eventSource.addEventListener('patch', self._onPatch)
+        self._eventSource.onerror(self._onError)
+        
+        origSet = self._eventSource.protocol.setFinishedDeferred
+        def sfd(d):
+            origSet(d)
+            d.addCallback(self._onDisconnect)
+        self._eventSource.protocol.setFinishedDeferred = sfd
+
+    def stats(self):
+        return {
+            'url': self.url,
+            'fullGraphReceived': self._fullGraphReceived,
+        }
+        
+    def addPatchListener(self, func):
+        """
+        func(patch, fullGraph=[true if the patch is the initial fullgraph])
+        """
+        self._listeners.add(func)
+
+    def stop(self):
+        log.info('stop read from %s', self.url)
+        try:
+            self._eventSource.protocol.stopProducing() # needed?
+        except AttributeError:
+            pass
+        self._eventSource = None
+
+    def _onDisconnect(self, a):
+        log.debug('PatchSource._onDisconnect from %s', self.url)
+        # skip this if we're doing a stop?
+        self.connectionLost.callback(None)
+
+    def _onError(self, msg):
+        log.debug('PatchSource._onError from %s %r', self.url, msg)
+        if not self._fullGraphReceived:
+            self.connectionFailed.callback(msg)
+        else:
+            self.connectionLost.callback(msg)
+
+    def _onFullGraph(self, message):
+        try:
+            g = ConjunctiveGraph()
+            g.parse(StringInputSource(message), format='json-ld')
+            p = Patch(addGraph=g)
+            self._sendPatch(p, fullGraph=True)
+        except:
+            log.error(traceback.format_exc())
+            raise
+        self._fullGraphReceived = True
+            
+    def _onPatch(self, message):
+        try:
+            p = patchFromJson(message)
+            self._sendPatch(p, fullGraph=False)
+        except:
+            log.error(traceback.format_exc())
+            raise
+
+    def _sendPatch(self, p, fullGraph):
+        log.debug('PatchSource %s received patch %s (fullGraph=%s)', self.url, p.shortSummary(), fullGraph)
+        for lis in self._listeners:
+            lis(p, fullGraph=fullGraph)
+        
+    def __del__(self):
+        if self._eventSource:
+            raise ValueError
+
+class ReconnectingPatchSource(object):
+    """
+    PatchSource api, but auto-reconnects internally and takes listener
+    at init time to not miss any patches. You'll get another
+    fullGraph=True patch if we have to reconnect.
+
+    todo: generate connection stmts in here
+    """
+    def __init__(self, url, listener):
+        self.url = url
+        self._stopped = False
+        self._listener = listener
+        self._reconnect()
+
+    def _reconnect(self):
+        if self._stopped:
+            return
+        self._ps = PatchSource(self.url)
+        self._ps.addPatchListener(self._onPatch)
+        self._ps.connectionFailed.addCallback(self._onConnectionFailed)
+        self._ps.connectionLost.addCallback(self._onConnectionLost)        
+
+    def _onPatch(self, p, fullGraph):
+        self._listener(p, fullGraph=fullGraph)
+
+    def stats(self):
+        return {
+            'reconnectedPatchSource': self._ps.stats(),
+        }
+        
+    def stop(self):
+        self._stopped = True
+        self._ps.stop()
+        
+    def _onConnectionFailed(self, arg):
+        reactor.callLater(60, self._reconnect)
+        
+    def _onConnectionLost(self, arg):
+        reactor.callLater(60, self._reconnect)        
+            
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/twisted_sse_demo/README.md	Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,16 @@
+Twisted SSE demo
+================
+
+A twisted web server that implements server sent events (SSE)
+
+To run this demo:
+
+    python sse_twisted_web.py
+    
+Open up http://localhost:12000 in your browser.
+
+To publish events:
+
+    curl -d 'data=Hello!' http://localhost:12000/publish
+    
+You should see the data you publish in your browser. That's it!
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/twisted_sse_demo/__init__.py	Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,1 @@
+# from https://github.com/juggernaut/twisted-sse-demo
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/twisted_sse_demo/api_example.py	Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,13 @@
+import time
+
+from eventsource import EventSource
+
+EVENTSOURCE_URL = 'http://localhost:12000/subscribe'
+
+def onmessage(data):
+    print 'Got payload with data %s' % data
+
+if __name__ == '__main__':
+    eventSource = EventSource(EVENTSOURCE_URL)
+    eventSource.onmessage(onmessage, callInThread=True)
+    time.sleep(20)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/twisted_sse_demo/eventsource.py	Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,76 @@
+from crochet import setup, run_in_reactor
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred
+from twisted.web.client import Agent
+from twisted.web.http_headers import Headers
+
+from sse_client import EventSourceProtocol
+
+setup()
+
+
+class EventSource(object):
+    """
+    The main EventSource class
+    """
+    def __init__(self, url):
+        self.url = url
+        self.protocol = EventSourceProtocol()
+        self.errorHandler = None
+        self.stashedError = None
+        self.connect()
+
+    @run_in_reactor
+    def connect(self):
+        """
+        Connect to the event source URL
+        """
+        agent = Agent(reactor)
+        d = agent.request(
+            'GET',
+            self.url,
+            Headers({
+                'User-Agent': ['Twisted SSE Client'],
+                'Cache-Control': ['no-cache'],
+                'Accept': ['text/event-stream; charset=utf-8'],
+            }),
+            None)
+        d.addErrback(self.connectError)
+        d.addCallback(self.cbRequest)
+
+    def cbRequest(self, response):
+        if response.code != 200:
+            self.callErrorHandler("non 200 response received: %d" %
+                                  response.code)
+        else:
+            finished = Deferred()
+            self.protocol.setFinishedDeferred(finished)
+            response.deliverBody(self.protocol)
+            return finished
+
+    def connectError(self, ignored):
+        self.callErrorHandler("error connecting to endpoint: %s" % self.url)
+
+    def callErrorHandler(self, msg):
+        if self.errorHandler:
+            func, callInThread = self.errorHandler
+            if callInThread:
+                reactor.callInThread(func, msg)
+            else:
+                func(msg)
+        else:
+            self.stashedError = msg
+
+    def onerror(self, func, callInThread=False):
+        self.errorHandler = func, callInThread
+        if self.stashedError:
+            self.callErrorHandler(self.stashedError)
+
+    def onmessage(self, func, callInThread=False):
+        self.addEventListener('message', func, callInThread)
+
+    def addEventListener(self, event, func, callInThread=False):
+        callback = func
+        if callInThread:
+            callback = lambda data: reactor.callInThread(func, data)
+        self.protocol.addCallback(event, callback)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/twisted_sse_demo/requirements.txt	Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,1 @@
+crochet
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/twisted_sse_demo/sse_client.py	Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,67 @@
+from twisted.protocols.basic import LineReceiver
+
+
+class EventSourceProtocol(LineReceiver):
+    def __init__(self):
+        self.MAX_LENGTH = 1 << 20
+        self.callbacks = {}
+        self.finished = None
+        # Initialize the event and data buffers
+        self.event = 'message'
+        self.data = ''
+
+    def lineLengthExceeded(self, line):
+        print "line too long"
+        raise NotImplementedError
+
+    def setFinishedDeferred(self, d):
+        self.finished = d
+
+    def addCallback(self, event, func):
+        self.callbacks[event] = func
+        
+    def lineReceived(self, line):
+        if line == '':
+            # Dispatch event
+            self.dispatchEvent()
+        else:
+            try:
+                field, value = line.split(':', 1)
+                # If value starts with a space, strip it.
+                value = lstrip(value)
+            except ValueError:
+                # We got a line with no colon, treat it as a field(ignore)
+                return
+
+            if field == '':
+                # This is a comment; ignore
+                pass
+            elif field == 'data':
+                self.data += value + '\n'
+            elif field == 'event':
+                self.event = value
+            elif field == 'id':
+                # Not implemented
+                pass
+            elif field == 'retry':
+                # Not implemented
+                pass
+
+    def connectionLost(self, reason):
+        if self.finished:
+            self.finished.callback(None)
+
+    def dispatchEvent(self):
+        """
+        Dispatch the event
+        """
+        # If last character is LF, strip it.
+        if self.data.endswith('\n'):
+            self.data = self.data[:-1]
+        if self.event in self.callbacks:
+            self.callbacks[self.event](self.data)
+        self.data = ''
+        self.event = 'message'
+
+def lstrip(value):
+    return value[1:] if value.startswith(' ') else value
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/twisted_sse_demo/sse_server.py	Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,95 @@
+import sys
+
+from twisted.web import server, resource
+from twisted.internet import reactor
+from twisted.python import log
+
+
+class Root(resource.Resource):
+    """
+    Root resource; serves JavaScript
+    """
+    def getChild(self, name, request):
+        if name == '':
+            return self
+        return resource.Resource.getChild(self, name, request)
+
+    def render_GET(self, request):
+        return r"""
+        <html>
+            <head>
+                <script language="JavaScript">
+                        eventSource = new EventSource("http://localhost:12000/subscribe");
+                        eventSource.onmessage = function(event) {
+                            element = document.getElementById("event-data");
+                            element.innerHTML = event.data;
+                        };
+                    </script>
+            </head>
+            <body>
+                <h1> Welcome to the SSE demo </h1>
+                <h3> Event data: </h3>
+                <p id="event-data"></p>
+            </body>
+        </html>
+        """
+
+
+class Subscribe(resource.Resource):
+    """
+    Implements the subscribe resource
+    """
+    isLeaf = True
+
+    def __init__(self):
+        self.subscribers = set()
+
+    def render_GET(self, request):
+        request.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
+        request.setResponseCode(200)
+        self.subscribers.add(request)
+        d = request.notifyFinish()
+        d.addBoth(self.removeSubscriber)
+        log.msg("Adding subscriber...")
+        request.write("")
+        return server.NOT_DONE_YET
+
+    def publishToAll(self, data):
+        for subscriber in self.subscribers:
+            for line in data:
+                subscriber.write("data: %s\r\n" % line)
+            # NOTE: the last CRLF is required to dispatch the event at the client
+            subscriber.write("\r\n")
+
+    def removeSubscriber(self, subscriber):
+        if subscriber in self.subscribers:
+            log.msg("Removing subscriber..")
+            self.subscribers.remove(subscriber)
+
+
+class Publish(resource.Resource):
+    """
+    Implements the publish resource
+    """
+    isLeaf = True
+
+    def __init__(self, subscriber):
+        self.subscriber = subscriber
+
+    def render_POST(self, request):
+        if 'data' not in request.args:
+            request.setResponseCode(400)
+            return "The parameter 'data' must be set\n"
+        data = request.args.get('data')
+        self.subscriber.publishToAll(data)
+        return 'Thank you for publishing data %s\n' % '\n'.join(data)
+
+
+root = Root()
+subscribe = Subscribe()
+root.putChild('subscribe', subscribe)
+root.putChild('publish', Publish(subscribe))
+site = server.Site(root)
+reactor.listenTCP(12000, site)
+log.startLogging(sys.stdout)
+reactor.run()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/collector/Dockerfile	Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,13 @@
+FROM bang6:5000/base_x86
+
+WORKDIR /opt
+
+COPY requirements.txt ./
+RUN pip install -r requirements.txt
+
+COPY twisted_sse_demo ./twisted_sse_demo
+COPY *.py req* ./
+
+EXPOSE 9072
+
+CMD [ "python", "./sse_collector.py" ]
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/collector/makefile	Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,22 @@
+JOB=collector
+PORT=9072
+
+TAG=bang6:5000/${JOB}_x86:latest
+
+build_image:
+	rm -rf tmp_ctx
+	mkdir -p tmp_ctx
+	cp -a Dockerfile ../../lib/*.py ../../lib/twisted_sse_demo *.py req* tmp_ctx
+	docker build --network=host -t ${TAG} tmp_ctx
+	docker push ${TAG}
+	rm -r tmp_ctx
+
+shell:
+	docker run --rm -it --cap-add SYS_PTRACE --net=host ${TAG} /bin/bash
+
+local_run:
+	docker run --rm -it -p ${PORT}:${PORT} \
+          -v `pwd`:/mnt \
+          --net=host \
+          ${TAG} \
+          python /mnt/sse_collector.py -v
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/collector/requirements.txt	Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,16 @@
+crochet==1.5.0
+cyclone
+docopt
+ipdb
+service_identity
+twisted
+
+#rdflib==4.2.2
+git+http://github.com/drewp/rdflib.git@5fa18be1231a5e4dfc86ec28f2f754158c6f6f0b#egg=rdflib
+
+#rdflib-jsonld==0.4.0
+git+http://github.com/RDFLib/rdflib-jsonld@cc5f005b222105724cd59c6069df9982fbd28c98#egg=rdflib_jsonld
+
+git+http://github.com/drewp/FuXi.git@003fb48984e9813808a23ba152798c125718f0e7#egg=FuXi
+git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93#egg=scales
+https://projects.bigasterisk.com/rdfdb/rdfdb-0.3.0.tar.gz
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/collector/sse_collector.py	Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,366 @@
+from __future__ import division
+"""
+requesting /graph/foo returns an SSE patch stream that's the
+result of fetching multiple other SSE patch streams. The result stream
+may include new statements injected by this service.
+
+Future:
+- filter out unneeded stmts from the sources
+- give a time resolution and concatenate any patches that come faster than that res
+"""
+from crochet import no_setup
+no_setup()
+
+import sys, logging, collections, json, time
+from twisted.internet import reactor
+import cyclone.web, cyclone.sse
+from rdflib import URIRef, Namespace
+from docopt import docopt
+
+sys.path.append('/opt') # docker is putting ../../lib/ here
+from logsetup import log
+from patchablegraph import jsonFromPatch
+
+from rdfdb.patch import Patch
+
+from patchsource import ReconnectingPatchSource
+
+ROOM = Namespace("http://projects.bigasterisk.com/room/")
+COLLECTOR = URIRef('http://bigasterisk.com/sse_collector/')
+
+from sse_collector_config import config
+
+class LocalStatements(object):
+    """
+    functions that make statements originating from sse_collector itself
+    """
+    def __init__(self, applyPatch):
+        self.applyPatch = applyPatch
+        self._sourceState = {} # source: state URIRef
+
+    def setSourceState(self, source, state):
+        """
+        add a patch to the COLLECTOR graph about the state of this
+        source. state=None to remove the source.
+        """
+        oldState = self._sourceState.get(source, None)
+        if state == oldState:
+            return
+        log.info('source state %s -> %s', source, state)
+        if oldState is None:
+            self._sourceState[source] = state
+            self.applyPatch(COLLECTOR, Patch(addQuads=[
+                (COLLECTOR, ROOM['source'], source, COLLECTOR),
+                (source, ROOM['state'], state, COLLECTOR),
+            ]))
+        elif state is None:
+            del self._sourceState[source]
+            self.applyPatch(COLLECTOR, Patch(delQuads=[
+                (COLLECTOR, ROOM['source'], source, COLLECTOR),
+                (source, ROOM['state'], oldState, COLLECTOR),
+            ]))
+        else:
+            self._sourceState[source] = state
+            self.applyPatch(COLLECTOR, Patch(
+                addQuads=[
+                (source, ROOM['state'], state, COLLECTOR),
+                ],
+                delQuads=[
+                    (source, ROOM['state'], oldState, COLLECTOR),
+                ]))
+
+def abbrevTerm(t):
+    if isinstance(t, URIRef):
+        return (t.replace('http://projects.bigasterisk.com/room/', 'room:')
+                .replace('http://bigasterisk.com/sse_collector/', 'sc:'))
+    return t
+
+def abbrevStmt(stmt):
+    return '(%s %s %s %s)' % tuple(map(abbrevTerm, stmt))
+    
+class ActiveStatements(object):
+    def __init__(self):
+
+        # This table holds statements asserted by any of our sources
+        # plus local statements that we introduce (source is
+        # http://bigasterisk.com/sse_collector/).
+        self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers)`
+
+    def stats(self):
+        return {
+'len': len(self.statements),
+            }
+        
+    def _postDeleteStatements(self):
+        statements = self.statements
+        class PostDeleter(object):
+            def __enter__(self):
+                self._garbage = []
+                return self
+            def add(self, stmt):
+                self._garbage.append(stmt)
+            def __exit__(self, type, value, traceback):
+                if type is not None:
+                    raise
+                for stmt in self._garbage:
+                    del statements[stmt]
+        return PostDeleter()
+        
+    def pprintTable(self):
+        for i, (stmt, (sources, handlers)) in enumerate(sorted(self.statements.items())):
+            print "%03d. %-80s from %s to %s" % (
+                i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers)        
+
+    def makeSyncPatch(self, handler, sources):
+        # todo: this could run all handlers at once, which is how we use it anyway
+        adds = []
+        dels = []
+        
+        with self._postDeleteStatements() as garbage:
+            for stmt, (stmtSources, handlers) in self.statements.iteritems():
+                belongsInHandler = not set(sources).isdisjoint(stmtSources)
+                handlerHasIt = handler in handlers
+                #log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt)
+                if belongsInHandler and not handlerHasIt:
+                    adds.append(stmt)
+                    handlers.add(handler)
+                elif not belongsInHandler and handlerHasIt:
+                    dels.append(stmt)
+                    handlers.remove(handler)
+                    if not handlers and not stmtSources:
+                        garbage.add(stmt)
+
+        return Patch(addQuads=adds, delQuads=dels)
+        
+    def applySourcePatch(self, source, p):
+        for stmt in p.addQuads:
+            sourceUrls, handlers = self.statements[stmt]
+            if source in sourceUrls:
+                raise ValueError("%s added stmt that it already had: %s" %
+                                 (source, abbrevStmt(stmt)))
+            sourceUrls.add(source)
+            
+        with self._postDeleteStatements() as garbage:
+            for stmt in p.delQuads:
+                sourceUrls, handlers = self.statements[stmt]
+                if source not in sourceUrls:
+                    raise ValueError("%s deleting stmt that it didn't have: %s" %
+                                     (source, abbrevStmt(stmt)))
+                sourceUrls.remove(source)
+                # this is rare, since some handler probably still has
+                # the stmt we're deleting, but it can happen e.g. when
+                # a handler was just deleted
+                if not sourceUrls and not handlers:
+                    garbage.add(stmt)
+
+    def replaceSourceStatements(self, source, stmts):
+        log.debug('replaceSourceStatements with %s stmts', len(stmts))
+        newStmts = set(stmts)
+
+        with self._postDeleteStatements() as garbage:
+            for stmt, (sources, handlers) in self.statements.iteritems():
+                if source in sources:
+                    if stmt not in stmts:
+                        sources.remove(source)
+                        if not sources and not handlers:
+                            garbage.add(stmt)
+                else:
+                    if stmt in stmts:
+                        sources.add(source)
+                newStmts.discard(stmt)
+
+        self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[]))
+
+    def discardHandler(self, handler):
+        with self._postDeleteStatements() as garbage:
+            for stmt, (sources, handlers) in self.statements.iteritems():
+                handlers.discard(handler)
+                if not sources and not handlers:
+                    garbage.add(stmt)
+
+    def discardSource(self, source):
+        with self._postDeleteStatements() as garbage:
+            for stmt, (sources, handlers) in self.statements.iteritems():
+                sources.discard(source)
+                if not sources and not handlers:
+                    garbage.add(stmt)
+                    
+class GraphClients(object):
+    """
+    All the active PatchSources and SSEHandlers
+
+    To handle all the overlapping-statement cases, we store a set of
+    true statements along with the sources that are currently
+    asserting them and the requesters who currently know them. As
+    statements come and go, we make patches to send to requesters.
+    """
+    def __init__(self):
+        self.clients = {}  # url: PatchSource (COLLECTOR is not listed)
+        self.handlers = set()  # handler
+        self.statements = ActiveStatements()
+        
+        self._localStatements = LocalStatements(self._onPatch)
+
+    def stats(self):
+        return {
+            'clients': [ps.stats() for ps in self.clients.values()],
+            'sseHandlers': [h.stats() for h in self.handlers],
+            'statements': self.statements.stats(),
+        }
+
+    def _sourcesForHandler(self, handler):
+        streamId = handler.streamId
+        matches = [s for s in config['streams'] if s['id'] == streamId]
+        if len(matches) != 1:
+            raise ValueError("%s matches for %r" % (len(matches), streamId))
+        return map(URIRef, matches[0]['sources']) + [COLLECTOR]
+        
+    def _onPatch(self, source, p, fullGraph=False):
+        if fullGraph:
+            # a reconnect may need to resend the full graph even
+            # though we've already sent some statements
+            self.statements.replaceSourceStatements(source, p.addQuads)
+        else:
+            self.statements.applySourcePatch(source, p)
+
+        self._sendUpdatePatch()
+
+        if log.isEnabledFor(logging.DEBUG):
+            self.statements.pprintTable()
+
+        if source != COLLECTOR:
+            self._localStatements.setSourceState(
+                source,
+                ROOM['fullGraphReceived'] if fullGraph else
+                ROOM['patchesReceived'])
+
+    def _sendUpdatePatch(self, handler=None):
+        """
+        send a patch event out this handler to bring it up to date with
+        self.statements
+        """
+        # reduce loops here- prepare all patches at once
+        for h in (self.handlers if handler is None else [handler]):
+            p = self.statements.makeSyncPatch(h, self._sourcesForHandler(h))
+            if not p.isNoop():
+                log.debug("send patch %s to %s", p.shortSummary(), h)
+                # This can be a giant line, which was a problem once. Might be 
+                # nice for this service to try to break it up into multiple sends,
+                # although there's no guarantee at all since any single stmt 
+                # could be any length.
+                h.sendEvent(message=jsonFromPatch(p), event='patch')
+                
+    def addSseHandler(self, handler):
+        log.info('addSseHandler %r %r', handler, handler.streamId)
+
+        # fail early if id doesn't match
+        sources = self._sourcesForHandler(handler)
+
+        self.handlers.add(handler)
+        
+        for source in sources:
+            if source not in self.clients and source != COLLECTOR:
+                self._localStatements.setSourceState(source, ROOM['connect'])
+                self.clients[source] = ReconnectingPatchSource(
+                    source, listener=lambda p, fullGraph, source=source: self._onPatch(
+                        source, p, fullGraph))
+        self._sendUpdatePatch(handler)
+        
+    def removeSseHandler(self, handler):
+        log.info('removeSseHandler %r', handler)
+
+        self.statements.discardHandler(handler)
+
+        for source in self._sourcesForHandler(handler):
+            for otherHandler in self.handlers:
+                if (otherHandler != handler and
+                    source in self._sourcesForHandler(otherHandler)):
+                    break
+            else:
+                self._stopClient(source)
+            
+        self.handlers.remove(handler)
+
+    def _stopClient(self, url):
+        if url == COLLECTOR:
+            return
+            
+        self.clients[url].stop()
+
+        self.statements.discardSource(url)
+        
+        self._localStatements.setSourceState(url, None)
+        del self.clients[url]
+        
+
+class SomeGraph(cyclone.sse.SSEHandler):
+    _handlerSerial = 0
+    def __init__(self, application, request):
+        cyclone.sse.SSEHandler.__init__(self, application, request)
+        self.streamId = request.uri[len('/graph/'):]
+        self.graphClients = self.settings.graphClients
+        self.created = time.time()
+        
+        self._serial = SomeGraph._handlerSerial
+        SomeGraph._handlerSerial += 1
+
+    def __repr__(self):
+        return '<Handler #%s>' % self._serial
+
+    def stats(self):
+        print self.__dict__
+        return {
+            'created': self.created,
+            'ageHours': (time.time() - self.created) / 3600,
+            'streamId': self.streamId,
+            'remoteIp': self.request.remote_ip,
+            'userAgent': self.request.headers.get('user-agent'),
+        }
+        
+    def bind(self):
+        self.graphClients.addSseHandler(self)
+        
+    def unbind(self):
+        self.graphClients.removeSseHandler(self)
+
+class Stats(cyclone.web.RequestHandler):
+    def get(self):
+        try:
+            stats = self.settings.graphClients.stats()
+        except:
+            import traceback; traceback.print_exc()
+            raise
+        
+        self.write(json.dumps({'graphClients': stats}, indent=2))
+
+class Root(cyclone.web.RequestHandler):
+    def get(self):
+        self.write('<html><body>sse_collector</body></html>')
+        
+if __name__ == '__main__':
+
+    arg = docopt("""
+    Usage: sse_collector.py [options]
+
+    -v   Verbose
+    """)
+    
+    if arg['-v']:
+        import twisted.python.log
+        twisted.python.log.startLogging(sys.stdout)
+        log.setLevel(logging.DEBUG)
+
+
+    graphClients = GraphClients()
+        
+    reactor.listenTCP(
+        9072,
+        cyclone.web.Application(
+            handlers=[
+                (r'/', Root),
+                (r'/stats', Stats),
+                (r'/graph/(.*)', SomeGraph),
+            ],
+            graphClients=graphClients),
+        interface='::')
+    reactor.run()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/reasoning/Dockerfile	Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,14 @@
+FROM bang6:5000/base_x86
+
+WORKDIR /opt
+
+COPY requirements.txt ./
+RUN pip install -r requirements.txt
+
+COPY twisted_sse_demo ./twisted_sse_demo
+COPY *.n3 *.py req* ./
+COPY input ./input
+
+EXPOSE 9071
+
+CMD [ "python", "./reasoning.py" ]
--- a/service/reasoning/inputgraph.py	Mon Apr 16 22:18:49 2018 -0700
+++ b/service/reasoning/inputgraph.py	Mon Sep 03 00:45:34 2018 -0700
@@ -12,8 +12,8 @@
 
 from patchsource import ReconnectingPatchSource
 
-sys.path.append("/my/proj/light9")
-from light9.rdfdb.rdflibpatch import patchQuads
+sys.path.append("/my/proj/rdfdb")
+from rdfdb.rdflibpatch import patchQuads
 
 log = logging.getLogger('fetch')
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/reasoning/makefile	Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,23 @@
+JOB=reasoning
+PORT=9071
+
+TAG=bang6:5000/${JOB}_x86:latest
+
+build_image:
+	rm -rf tmp_ctx
+	mkdir -p tmp_ctx
+	cp -a Dockerfile ../../lib/*.py ../../lib/twisted_sse_demo *.py *.n3 input req* tmp_ctx
+	rsync -a input tmp_ctx/
+	docker build --network=host -t ${TAG} tmp_ctx
+	docker push ${TAG}
+	rm -r tmp_ctx
+
+shell:
+	docker run --rm -it --cap-add SYS_PTRACE --net=host ${TAG}  /bin/bash
+
+local_run:
+	docker run --rm -it -p ${PORT}:${PORT} \
+          -v `pwd`:/mnt \
+          --net=host \
+          ${TAG} \
+          python /mnt/${JOB}.py -iro
--- a/service/reasoning/patchsource.py	Mon Apr 16 22:18:49 2018 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,143 +0,0 @@
-import sys, logging
-import traceback
-from twisted.internet import reactor, defer
-from twisted_sse_demo.eventsource import EventSource
-from rdflib import ConjunctiveGraph
-from rdflib.parser import StringInputSource
-
-sys.path.append("../../lib")
-from patchablegraph import patchFromJson
-
-sys.path.append("/my/proj/light9")
-from light9.rdfdb.patch import Patch
-
-log = logging.getLogger('fetch')
-
-class PatchSource(object):
-    """wrap EventSource so it emits Patch objects and has an explicit stop method."""
-    def __init__(self, url):
-        self.url = url
-
-        # add callbacks to these to learn if we failed to connect
-        # (approximately) or if the ccnnection was unexpectedly lost
-        self.connectionFailed = defer.Deferred()
-        self.connectionLost = defer.Deferred()
-        
-        self._listeners = set()
-        log.info('start read from %s', url)
-        # note: fullGraphReceived isn't guaranteed- the stream could
-        # start with patches
-        self._fullGraphReceived = False
-        self._eventSource = EventSource(url.toPython().encode('utf8'))
-        self._eventSource.protocol.delimiter = '\n'
-
-        self._eventSource.addEventListener('fullGraph', self._onFullGraph)
-        self._eventSource.addEventListener('patch', self._onPatch)
-        self._eventSource.onerror(self._onError)
-        
-        origSet = self._eventSource.protocol.setFinishedDeferred
-        def sfd(d):
-            origSet(d)
-            d.addCallback(self._onDisconnect)
-        self._eventSource.protocol.setFinishedDeferred = sfd
-
-    def stats(self):
-        return {
-            'url': self.url,
-            'fullGraphReceived': self._fullGraphReceived,
-        }
-        
-    def addPatchListener(self, func):
-        """
-        func(patch, fullGraph=[true if the patch is the initial fullgraph])
-        """
-        self._listeners.add(func)
-
-    def stop(self):
-        log.info('stop read from %s', self.url)
-        try:
-            self._eventSource.protocol.stopProducing() # needed?
-        except AttributeError:
-            pass
-        self._eventSource = None
-
-    def _onDisconnect(self, a):
-        log.debug('PatchSource._onDisconnect from %s', self.url)
-        # skip this if we're doing a stop?
-        self.connectionLost.callback(None)
-
-    def _onError(self, msg):
-        log.debug('PatchSource._onError from %s %r', self.url, msg)
-        if not self._fullGraphReceived:
-            self.connectionFailed.callback(msg)
-        else:
-            self.connectionLost.callback(msg)
-
-    def _onFullGraph(self, message):
-        try:
-            g = ConjunctiveGraph()
-            g.parse(StringInputSource(message), format='json-ld')
-            p = Patch(addGraph=g)
-            self._sendPatch(p, fullGraph=True)
-        except:
-            log.error(traceback.format_exc())
-            raise
-        self._fullGraphReceived = True
-            
-    def _onPatch(self, message):
-        try:
-            p = patchFromJson(message)
-            self._sendPatch(p, fullGraph=False)
-        except:
-            log.error(traceback.format_exc())
-            raise
-
-    def _sendPatch(self, p, fullGraph):
-        log.debug('PatchSource %s received patch %s (fullGraph=%s)', self.url, p.shortSummary(), fullGraph)
-        for lis in self._listeners:
-            lis(p, fullGraph=fullGraph)
-        
-    def __del__(self):
-        if self._eventSource:
-            raise ValueError
-
-class ReconnectingPatchSource(object):
-    """
-    PatchSource api, but auto-reconnects internally and takes listener
-    at init time to not miss any patches. You'll get another
-    fullGraph=True patch if we have to reconnect.
-
-    todo: generate connection stmts in here
-    """
-    def __init__(self, url, listener):
-        self.url = url
-        self._stopped = False
-        self._listener = listener
-        self._reconnect()
-
-    def _reconnect(self):
-        if self._stopped:
-            return
-        self._ps = PatchSource(self.url)
-        self._ps.addPatchListener(self._onPatch)
-        self._ps.connectionFailed.addCallback(self._onConnectionFailed)
-        self._ps.connectionLost.addCallback(self._onConnectionLost)        
-
-    def _onPatch(self, p, fullGraph):
-        self._listener(p, fullGraph=fullGraph)
-
-    def stats(self):
-        return {
-            'reconnectedPatchSource': self._ps.stats(),
-        }
-        
-    def stop(self):
-        self._stopped = True
-        self._ps.stop()
-        
-    def _onConnectionFailed(self, arg):
-        reactor.callLater(60, self._reconnect)
-        
-    def _onConnectionLost(self, arg):
-        reactor.callLater(60, self._reconnect)        
-            
--- a/service/reasoning/reasoning.py	Mon Apr 16 22:18:49 2018 -0700
+++ b/service/reasoning/reasoning.py	Mon Sep 03 00:45:34 2018 -0700
@@ -14,14 +14,12 @@
 When do we gather? The services should be able to trigger us, perhaps
 with PSHB, that their graph has changed.
 """
-
-
 from crochet import no_setup
 no_setup()
 
-
 import json, time, traceback, sys
 from logging import getLogger, DEBUG, WARN
+sys.path.append('/opt') # docker is putting lib/ here
 
 from colorlog import ColoredFormatter
 from docopt import docopt
@@ -38,7 +36,6 @@
 from inputgraph import InputGraph
 from escapeoutputstatements import unquoteOutputStatements
 
-sys.path.append("../../lib")
 from logsetup import log
 
 
--- a/service/reasoning/requirements.txt	Mon Apr 16 22:18:49 2018 -0700
+++ b/service/reasoning/requirements.txt	Mon Sep 03 00:45:34 2018 -0700
@@ -1,11 +1,13 @@
-twisted==17.1.0
-cyclone==1.1
-ipdb==0.10.3
-docopt==0.6.2
+colorlog==2.6.0
+crochet==1.5.0
+cyclone
+docopt
+ipdb
+service_identity
+twisted
+
 rdflib==4.2.2
+rdflib_jsonld==0.4.0
 git+http://github.com/drewp/FuXi.git@003fb48984e9813808a23ba152798c125718f0e7#egg=FuXi
 git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93#egg=scales
-service_identity==17.0.0
-crochet==1.5.0
-rdflib_jsonld==0.4.0
-
+https://projects.bigasterisk.com/rdfdb/rdfdb-0.3.0.tar.gz
--- a/service/reasoning/sse_collector.py	Mon Apr 16 22:18:49 2018 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,396 +0,0 @@
-from __future__ import division
-"""
-requesting /graph/foo returns an SSE patch stream that's the
-result of fetching multiple other SSE patch streams. The result stream
-may include new statements injected by this service.
-
-Future:
-- filter out unneeded stmts from the sources
-- give a time resolution and concatenate any patches that come faster than that res
-"""
-
-config = {
-    'streams': [
-        {'id': 'home',
-         'sources': [
-# should be from :reasoning :source ?s
-'http://garage:9059/graph/events', # "garage pi"
-'http://kitchen:9059/graph/events', # "kitchen pi"
-'http://living:9059/graph/events', # "living room pi"
-'http://slash:9059/graph/events', # "slash arduino"
-'http://bed:9059/graph/events', # "bed pi"
-'http://brace6:9059/graph/events', # "brace arduino"
-'http://changing:9059/graph/events', # "changing pi"
-'http://bang:9075/graph/events', # "env"
-'http://bang:9070/graph/events', # "wifi usage"
-'http://bang:9099/graph/events', # "trails"
-'http://dash:9095/graph/events', # "dash monitor"
-'http://dash:9107/graph/events', # "dash x idle"
-'http://brace6:9095/graph/events', # "brace monitor"
-'http://brace6:9107/graph/events', # "brace x idle"
-'http://slash:9095/graph/events', # "slash monitor"
-'http://slash:9107/graph/events', # "slash x idle" 
-
-
-
-         ]
-     },
-    ]
-}
-
-from crochet import no_setup
-no_setup()
-
-import sys, logging, collections, json, time
-from twisted.internet import reactor
-import cyclone.web, cyclone.sse
-from rdflib import URIRef, Namespace
-from docopt import docopt
-
-
-sys.path.append("../../lib")
-from logsetup import log
-from patchablegraph import jsonFromPatch
-
-sys.path.append("/my/proj/light9")
-from light9.rdfdb.patch import Patch
-
-from patchsource import ReconnectingPatchSource
-
-ROOM = Namespace("http://projects.bigasterisk.com/room/")
-COLLECTOR = URIRef('http://bigasterisk.com/sse_collector/')
-
-class LocalStatements(object):
-    """
-    functions that make statements originating from sse_collector itself
-    """
-    def __init__(self, applyPatch):
-        self.applyPatch = applyPatch
-        self._sourceState = {} # source: state URIRef
-
-    def setSourceState(self, source, state):
-        """
-        add a patch to the COLLECTOR graph about the state of this
-        source. state=None to remove the source.
-        """
-        oldState = self._sourceState.get(source, None)
-        if state == oldState:
-            return
-        log.info('source state %s -> %s', source, state)
-        if oldState is None:
-            self._sourceState[source] = state
-            self.applyPatch(COLLECTOR, Patch(addQuads=[
-                (COLLECTOR, ROOM['source'], source, COLLECTOR),
-                (source, ROOM['state'], state, COLLECTOR),
-            ]))
-        elif state is None:
-            del self._sourceState[source]
-            self.applyPatch(COLLECTOR, Patch(delQuads=[
-                (COLLECTOR, ROOM['source'], source, COLLECTOR),
-                (source, ROOM['state'], oldState, COLLECTOR),
-            ]))
-        else:
-            self._sourceState[source] = state
-            self.applyPatch(COLLECTOR, Patch(
-                addQuads=[
-                (source, ROOM['state'], state, COLLECTOR),
-                ],
-                delQuads=[
-                    (source, ROOM['state'], oldState, COLLECTOR),
-                ]))
-
-def abbrevTerm(t):
-    if isinstance(t, URIRef):
-        return (t.replace('http://projects.bigasterisk.com/room/', 'room:')
-                .replace('http://bigasterisk.com/sse_collector/', 'sc:'))
-    return t
-
-def abbrevStmt(stmt):
-    return '(%s %s %s %s)' % tuple(map(abbrevTerm, stmt))
-    
-class ActiveStatements(object):
-    def __init__(self):
-
-        # This table holds statements asserted by any of our sources
-        # plus local statements that we introduce (source is
-        # http://bigasterisk.com/sse_collector/).
-        self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers)`
-
-    def stats(self):
-        return {
-'len': len(self.statements),
-            }
-        
-    def _postDeleteStatements(self):
-        statements = self.statements
-        class PostDeleter(object):
-            def __enter__(self):
-                self._garbage = []
-                return self
-            def add(self, stmt):
-                self._garbage.append(stmt)
-            def __exit__(self, type, value, traceback):
-                if type is not None:
-                    raise
-                for stmt in self._garbage:
-                    del statements[stmt]
-        return PostDeleter()
-        
-    def pprintTable(self):
-        for i, (stmt, (sources, handlers)) in enumerate(sorted(self.statements.items())):
-            print "%03d. %-80s from %s to %s" % (
-                i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers)        
-
-    def makeSyncPatch(self, handler, sources):
-        # todo: this could run all handlers at once, which is how we use it anyway
-        adds = []
-        dels = []
-        
-        with self._postDeleteStatements() as garbage:
-            for stmt, (stmtSources, handlers) in self.statements.iteritems():
-                belongsInHandler = not set(sources).isdisjoint(stmtSources)
-                handlerHasIt = handler in handlers
-                #log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt)
-                if belongsInHandler and not handlerHasIt:
-                    adds.append(stmt)
-                    handlers.add(handler)
-                elif not belongsInHandler and handlerHasIt:
-                    dels.append(stmt)
-                    handlers.remove(handler)
-                    if not handlers and not stmtSources:
-                        garbage.add(stmt)
-
-        return Patch(addQuads=adds, delQuads=dels)
-        
-    def applySourcePatch(self, source, p):
-        for stmt in p.addQuads:
-            sourceUrls, handlers = self.statements[stmt]
-            if source in sourceUrls:
-                raise ValueError("%s added stmt that it already had: %s" %
-                                 (source, abbrevStmt(stmt)))
-            sourceUrls.add(source)
-            
-        with self._postDeleteStatements() as garbage:
-            for stmt in p.delQuads:
-                sourceUrls, handlers = self.statements[stmt]
-                if source not in sourceUrls:
-                    raise ValueError("%s deleting stmt that it didn't have: %s" %
-                                     (source, abbrevStmt(stmt)))
-                sourceUrls.remove(source)
-                # this is rare, since some handler probably still has
-                # the stmt we're deleting, but it can happen e.g. when
-                # a handler was just deleted
-                if not sourceUrls and not handlers:
-                    garbage.add(stmt)
-
-    def replaceSourceStatements(self, source, stmts):
-        log.debug('replaceSourceStatements with %s stmts', len(stmts))
-        newStmts = set(stmts)
-
-        with self._postDeleteStatements() as garbage:
-            for stmt, (sources, handlers) in self.statements.iteritems():
-                if source in sources:
-                    if stmt not in stmts:
-                        sources.remove(source)
-                        if not sources and not handlers:
-                            garbage.add(stmt)
-                else:
-                    if stmt in stmts:
-                        sources.add(source)
-                newStmts.discard(stmt)
-
-        self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[]))
-
-    def discardHandler(self, handler):
-        with self._postDeleteStatements() as garbage:
-            for stmt, (sources, handlers) in self.statements.iteritems():
-                handlers.discard(handler)
-                if not sources and not handlers:
-                    garbage.add(stmt)
-
-    def discardSource(self, source):
-        with self._postDeleteStatements() as garbage:
-            for stmt, (sources, handlers) in self.statements.iteritems():
-                sources.discard(source)
-                if not sources and not handlers:
-                    garbage.add(stmt)
-                    
-class GraphClients(object):
-    """
-    All the active PatchSources and SSEHandlers
-
-    To handle all the overlapping-statement cases, we store a set of
-    true statements along with the sources that are currently
-    asserting them and the requesters who currently know them. As
-    statements come and go, we make patches to send to requesters.
-    """
-    def __init__(self):
-        self.clients = {}  # url: PatchSource (COLLECTOR is not listed)
-        self.handlers = set()  # handler
-        self.statements = ActiveStatements()
-        
-        self._localStatements = LocalStatements(self._onPatch)
-
-    def stats(self):
-        return {
-            'clients': [ps.stats() for ps in self.clients.values()],
-            'sseHandlers': [h.stats() for h in self.handlers],
-            'statements': self.statements.stats(),
-        }
-
-    def _sourcesForHandler(self, handler):
-        streamId = handler.streamId
-        matches = [s for s in config['streams'] if s['id'] == streamId]
-        if len(matches) != 1:
-            raise ValueError("%s matches for %r" % (len(matches), streamId))
-        return map(URIRef, matches[0]['sources']) + [COLLECTOR]
-        
-    def _onPatch(self, source, p, fullGraph=False):
-        if fullGraph:
-            # a reconnect may need to resend the full graph even
-            # though we've already sent some statements
-            self.statements.replaceSourceStatements(source, p.addQuads)
-        else:
-            self.statements.applySourcePatch(source, p)
-
-        self._sendUpdatePatch()
-
-        if log.isEnabledFor(logging.DEBUG):
-            self.statements.pprintTable()
-
-        if source != COLLECTOR:
-            self._localStatements.setSourceState(
-                source,
-                ROOM['fullGraphReceived'] if fullGraph else
-                ROOM['patchesReceived'])
-
-    def _sendUpdatePatch(self, handler=None):
-        """
-        send a patch event out this handler to bring it up to date with
-        self.statements
-        """
-        # reduce loops here- prepare all patches at once
-        for h in (self.handlers if handler is None else [handler]):
-            p = self.statements.makeSyncPatch(h, self._sourcesForHandler(h))
-            if not p.isNoop():
-                log.debug("send patch %s to %s", p.shortSummary(), h)
-                # This can be a giant line, which was a problem once. Might be 
-                # nice for this service to try to break it up into multiple sends,
-                # although there's no guarantee at all since any single stmt 
-                # could be any length.
-                h.sendEvent(message=jsonFromPatch(p), event='patch')
-                
-    def addSseHandler(self, handler):
-        log.info('addSseHandler %r %r', handler, handler.streamId)
-
-        # fail early if id doesn't match
-        sources = self._sourcesForHandler(handler)
-
-        self.handlers.add(handler)
-        
-        for source in sources:
-            if source not in self.clients and source != COLLECTOR:
-                self._localStatements.setSourceState(source, ROOM['connect'])
-                self.clients[source] = ReconnectingPatchSource(
-                    source, listener=lambda p, fullGraph, source=source: self._onPatch(
-                        source, p, fullGraph))
-        self._sendUpdatePatch(handler)
-        
-    def removeSseHandler(self, handler):
-        log.info('removeSseHandler %r', handler)
-
-        self.statements.discardHandler(handler)
-
-        for source in self._sourcesForHandler(handler):
-            for otherHandler in self.handlers:
-                if (otherHandler != handler and
-                    source in self._sourcesForHandler(otherHandler)):
-                    break
-            else:
-                self._stopClient(source)
-            
-        self.handlers.remove(handler)
-
-    def _stopClient(self, url):
-        if url == COLLECTOR:
-            return
-            
-        self.clients[url].stop()
-
-        self.statements.discardSource(url)
-        
-        self._localStatements.setSourceState(url, None)
-        del self.clients[url]
-        
-
-class SomeGraph(cyclone.sse.SSEHandler):
-    _handlerSerial = 0
-    def __init__(self, application, request):
-        cyclone.sse.SSEHandler.__init__(self, application, request)
-        self.streamId = request.uri[len('/graph/'):]
-        self.graphClients = self.settings.graphClients
-        self.created = time.time()
-        
-        self._serial = SomeGraph._handlerSerial
-        SomeGraph._handlerSerial += 1
-
-    def __repr__(self):
-        return '<Handler #%s>' % self._serial
-
-    def stats(self):
-        print self.__dict__
-        return {
-            'created': self.created,
-            'ageHours': (time.time() - self.created) / 3600,
-            'streamId': self.streamId,
-            'remoteIp': self.request.remote_ip,
-            'userAgent': self.request.headers.get('user-agent'),
-        }
-        
-    def bind(self):
-        self.graphClients.addSseHandler(self)
-        
-    def unbind(self):
-        self.graphClients.removeSseHandler(self)
-
-class Stats(cyclone.web.RequestHandler):
-    def get(self):
-        try:
-            stats = self.settings.graphClients.stats()
-        except:
-            import traceback; traceback.print_exc()
-            raise
-        
-        self.write(json.dumps({'graphClients': stats}, indent=2))
-
-class Root(cyclone.web.RequestHandler):
-    def get(self):
-        self.write('<html><body>sse_collector</body></html>')
-        
-if __name__ == '__main__':
-
-    arg = docopt("""
-    Usage: sse_collector.py [options]
-
-    -v   Verbose
-    """)
-    
-    if arg['-v']:
-        import twisted.python.log
-        twisted.python.log.startLogging(sys.stdout)
-        log.setLevel(logging.DEBUG)
-
-
-    graphClients = GraphClients()
-        
-    reactor.listenTCP(
-        9072,
-        cyclone.web.Application(
-            handlers=[
-                (r'/', Root),
-                (r'/stats', Stats),
-                (r'/graph/(.*)', SomeGraph),
-            ],
-            graphClients=graphClients),
-        interface='::')
-    reactor.run()
--- a/service/reasoning/twisted_sse_demo/README.md	Mon Apr 16 22:18:49 2018 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,16 +0,0 @@
-Twisted SSE demo
-================
-
-A twisted web server that implements server sent events (SSE)
-
-To run this demo:
-
-    python sse_twisted_web.py
-    
-Open up http://localhost:12000 in your browser.
-
-To publish events:
-
-    curl -d 'data=Hello!' http://localhost:12000/publish
-    
-You should see the data you publish in your browser. That's it!
--- a/service/reasoning/twisted_sse_demo/__init__.py	Mon Apr 16 22:18:49 2018 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,1 +0,0 @@
-# from https://github.com/juggernaut/twisted-sse-demo
--- a/service/reasoning/twisted_sse_demo/api_example.py	Mon Apr 16 22:18:49 2018 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,13 +0,0 @@
-import time
-
-from eventsource import EventSource
-
-EVENTSOURCE_URL = 'http://localhost:12000/subscribe'
-
-def onmessage(data):
-    print 'Got payload with data %s' % data
-
-if __name__ == '__main__':
-    eventSource = EventSource(EVENTSOURCE_URL)
-    eventSource.onmessage(onmessage, callInThread=True)
-    time.sleep(20)
--- a/service/reasoning/twisted_sse_demo/eventsource.py	Mon Apr 16 22:18:49 2018 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,76 +0,0 @@
-from crochet import setup, run_in_reactor
-from twisted.internet import reactor
-from twisted.internet.defer import Deferred
-from twisted.web.client import Agent
-from twisted.web.http_headers import Headers
-
-from sse_client import EventSourceProtocol
-
-setup()
-
-
-class EventSource(object):
-    """
-    The main EventSource class
-    """
-    def __init__(self, url):
-        self.url = url
-        self.protocol = EventSourceProtocol()
-        self.errorHandler = None
-        self.stashedError = None
-        self.connect()
-
-    @run_in_reactor
-    def connect(self):
-        """
-        Connect to the event source URL
-        """
-        agent = Agent(reactor)
-        d = agent.request(
-            'GET',
-            self.url,
-            Headers({
-                'User-Agent': ['Twisted SSE Client'],
-                'Cache-Control': ['no-cache'],
-                'Accept': ['text/event-stream; charset=utf-8'],
-            }),
-            None)
-        d.addErrback(self.connectError)
-        d.addCallback(self.cbRequest)
-
-    def cbRequest(self, response):
-        if response.code != 200:
-            self.callErrorHandler("non 200 response received: %d" %
-                                  response.code)
-        else:
-            finished = Deferred()
-            self.protocol.setFinishedDeferred(finished)
-            response.deliverBody(self.protocol)
-            return finished
-
-    def connectError(self, ignored):
-        self.callErrorHandler("error connecting to endpoint: %s" % self.url)
-
-    def callErrorHandler(self, msg):
-        if self.errorHandler:
-            func, callInThread = self.errorHandler
-            if callInThread:
-                reactor.callInThread(func, msg)
-            else:
-                func(msg)
-        else:
-            self.stashedError = msg
-
-    def onerror(self, func, callInThread=False):
-        self.errorHandler = func, callInThread
-        if self.stashedError:
-            self.callErrorHandler(self.stashedError)
-
-    def onmessage(self, func, callInThread=False):
-        self.addEventListener('message', func, callInThread)
-
-    def addEventListener(self, event, func, callInThread=False):
-        callback = func
-        if callInThread:
-            callback = lambda data: reactor.callInThread(func, data)
-        self.protocol.addCallback(event, callback)
--- a/service/reasoning/twisted_sse_demo/requirements.txt	Mon Apr 16 22:18:49 2018 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,1 +0,0 @@
-crochet
--- a/service/reasoning/twisted_sse_demo/sse_client.py	Mon Apr 16 22:18:49 2018 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,67 +0,0 @@
-from twisted.protocols.basic import LineReceiver
-
-
-class EventSourceProtocol(LineReceiver):
-    def __init__(self):
-        self.MAX_LENGTH = 1 << 20
-        self.callbacks = {}
-        self.finished = None
-        # Initialize the event and data buffers
-        self.event = 'message'
-        self.data = ''
-
-    def lineLengthExceeded(self, line):
-        print "line too long"
-        raise NotImplementedError
-
-    def setFinishedDeferred(self, d):
-        self.finished = d
-
-    def addCallback(self, event, func):
-        self.callbacks[event] = func
-        
-    def lineReceived(self, line):
-        if line == '':
-            # Dispatch event
-            self.dispatchEvent()
-        else:
-            try:
-                field, value = line.split(':', 1)
-                # If value starts with a space, strip it.
-                value = lstrip(value)
-            except ValueError:
-                # We got a line with no colon, treat it as a field(ignore)
-                return
-
-            if field == '':
-                # This is a comment; ignore
-                pass
-            elif field == 'data':
-                self.data += value + '\n'
-            elif field == 'event':
-                self.event = value
-            elif field == 'id':
-                # Not implemented
-                pass
-            elif field == 'retry':
-                # Not implemented
-                pass
-
-    def connectionLost(self, reason):
-        if self.finished:
-            self.finished.callback(None)
-
-    def dispatchEvent(self):
-        """
-        Dispatch the event
-        """
-        # If last character is LF, strip it.
-        if self.data.endswith('\n'):
-            self.data = self.data[:-1]
-        if self.event in self.callbacks:
-            self.callbacks[self.event](self.data)
-        self.data = ''
-        self.event = 'message'
-
-def lstrip(value):
-    return value[1:] if value.startswith(' ') else value
--- a/service/reasoning/twisted_sse_demo/sse_server.py	Mon Apr 16 22:18:49 2018 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,95 +0,0 @@
-import sys
-
-from twisted.web import server, resource
-from twisted.internet import reactor
-from twisted.python import log
-
-
-class Root(resource.Resource):
-    """
-    Root resource; serves JavaScript
-    """
-    def getChild(self, name, request):
-        if name == '':
-            return self
-        return resource.Resource.getChild(self, name, request)
-
-    def render_GET(self, request):
-        return r"""
-        <html>
-            <head>
-                <script language="JavaScript">
-                        eventSource = new EventSource("http://localhost:12000/subscribe");
-                        eventSource.onmessage = function(event) {
-                            element = document.getElementById("event-data");
-                            element.innerHTML = event.data;
-                        };
-                    </script>
-            </head>
-            <body>
-                <h1> Welcome to the SSE demo </h1>
-                <h3> Event data: </h3>
-                <p id="event-data"></p>
-            </body>
-        </html>
-        """
-
-
-class Subscribe(resource.Resource):
-    """
-    Implements the subscribe resource
-    """
-    isLeaf = True
-
-    def __init__(self):
-        self.subscribers = set()
-
-    def render_GET(self, request):
-        request.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
-        request.setResponseCode(200)
-        self.subscribers.add(request)
-        d = request.notifyFinish()
-        d.addBoth(self.removeSubscriber)
-        log.msg("Adding subscriber...")
-        request.write("")
-        return server.NOT_DONE_YET
-
-    def publishToAll(self, data):
-        for subscriber in self.subscribers:
-            for line in data:
-                subscriber.write("data: %s\r\n" % line)
-            # NOTE: the last CRLF is required to dispatch the event at the client
-            subscriber.write("\r\n")
-
-    def removeSubscriber(self, subscriber):
-        if subscriber in self.subscribers:
-            log.msg("Removing subscriber..")
-            self.subscribers.remove(subscriber)
-
-
-class Publish(resource.Resource):
-    """
-    Implements the publish resource
-    """
-    isLeaf = True
-
-    def __init__(self, subscriber):
-        self.subscriber = subscriber
-
-    def render_POST(self, request):
-        if 'data' not in request.args:
-            request.setResponseCode(400)
-            return "The parameter 'data' must be set\n"
-        data = request.args.get('data')
-        self.subscriber.publishToAll(data)
-        return 'Thank you for publishing data %s\n' % '\n'.join(data)
-
-
-root = Root()
-subscribe = Subscribe()
-root.putChild('subscribe', subscribe)
-root.putChild('publish', Publish(subscribe))
-site = server.Site(root)
-reactor.listenTCP(12000, site)
-log.startLogging(sys.stdout)
-reactor.run()