# HG changeset patch
# User drewp@bigasterisk.com
# Date 1535960734 25200
# Node ID 7716b1810d6cad2d37318e7d08a92b769b781a84
# Parent a380561fd8a8ba4c77dd08f87de9819df9bd2fe2
reasoning & collector move into docker images
Ignore-this: 67e97d307eba96791cbe77e57c57ad57
diff -r a380561fd8a8 -r 7716b1810d6c lib/patchsource.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/patchsource.py Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,143 @@
+import sys, logging
+import traceback
+from twisted.internet import reactor, defer
+from twisted_sse_demo.eventsource import EventSource
+from rdflib import ConjunctiveGraph
+from rdflib.parser import StringInputSource
+
+sys.path.append("../../lib")
+from patchablegraph import patchFromJson
+
+sys.path.append("/my/proj/rdfdb")
+from rdfdb.patch import Patch
+
+log = logging.getLogger('fetch')
+
+class PatchSource(object):
+ """wrap EventSource so it emits Patch objects and has an explicit stop method."""
+ def __init__(self, url):
+ self.url = url
+
+ # add callbacks to these to learn if we failed to connect
+ # (approximately) or if the ccnnection was unexpectedly lost
+ self.connectionFailed = defer.Deferred()
+ self.connectionLost = defer.Deferred()
+
+ self._listeners = set()
+ log.info('start read from %s', url)
+ # note: fullGraphReceived isn't guaranteed- the stream could
+ # start with patches
+ self._fullGraphReceived = False
+ self._eventSource = EventSource(url.toPython().encode('utf8'))
+ self._eventSource.protocol.delimiter = '\n'
+
+ self._eventSource.addEventListener('fullGraph', self._onFullGraph)
+ self._eventSource.addEventListener('patch', self._onPatch)
+ self._eventSource.onerror(self._onError)
+
+ origSet = self._eventSource.protocol.setFinishedDeferred
+ def sfd(d):
+ origSet(d)
+ d.addCallback(self._onDisconnect)
+ self._eventSource.protocol.setFinishedDeferred = sfd
+
+ def stats(self):
+ return {
+ 'url': self.url,
+ 'fullGraphReceived': self._fullGraphReceived,
+ }
+
+ def addPatchListener(self, func):
+ """
+ func(patch, fullGraph=[true if the patch is the initial fullgraph])
+ """
+ self._listeners.add(func)
+
+ def stop(self):
+ log.info('stop read from %s', self.url)
+ try:
+ self._eventSource.protocol.stopProducing() # needed?
+ except AttributeError:
+ pass
+ self._eventSource = None
+
+ def _onDisconnect(self, a):
+ log.debug('PatchSource._onDisconnect from %s', self.url)
+ # skip this if we're doing a stop?
+ self.connectionLost.callback(None)
+
+ def _onError(self, msg):
+ log.debug('PatchSource._onError from %s %r', self.url, msg)
+ if not self._fullGraphReceived:
+ self.connectionFailed.callback(msg)
+ else:
+ self.connectionLost.callback(msg)
+
+ def _onFullGraph(self, message):
+ try:
+ g = ConjunctiveGraph()
+ g.parse(StringInputSource(message), format='json-ld')
+ p = Patch(addGraph=g)
+ self._sendPatch(p, fullGraph=True)
+ except:
+ log.error(traceback.format_exc())
+ raise
+ self._fullGraphReceived = True
+
+ def _onPatch(self, message):
+ try:
+ p = patchFromJson(message)
+ self._sendPatch(p, fullGraph=False)
+ except:
+ log.error(traceback.format_exc())
+ raise
+
+ def _sendPatch(self, p, fullGraph):
+ log.debug('PatchSource %s received patch %s (fullGraph=%s)', self.url, p.shortSummary(), fullGraph)
+ for lis in self._listeners:
+ lis(p, fullGraph=fullGraph)
+
+ def __del__(self):
+ if self._eventSource:
+ raise ValueError
+
+class ReconnectingPatchSource(object):
+ """
+ PatchSource api, but auto-reconnects internally and takes listener
+ at init time to not miss any patches. You'll get another
+ fullGraph=True patch if we have to reconnect.
+
+ todo: generate connection stmts in here
+ """
+ def __init__(self, url, listener):
+ self.url = url
+ self._stopped = False
+ self._listener = listener
+ self._reconnect()
+
+ def _reconnect(self):
+ if self._stopped:
+ return
+ self._ps = PatchSource(self.url)
+ self._ps.addPatchListener(self._onPatch)
+ self._ps.connectionFailed.addCallback(self._onConnectionFailed)
+ self._ps.connectionLost.addCallback(self._onConnectionLost)
+
+ def _onPatch(self, p, fullGraph):
+ self._listener(p, fullGraph=fullGraph)
+
+ def stats(self):
+ return {
+ 'reconnectedPatchSource': self._ps.stats(),
+ }
+
+ def stop(self):
+ self._stopped = True
+ self._ps.stop()
+
+ def _onConnectionFailed(self, arg):
+ reactor.callLater(60, self._reconnect)
+
+ def _onConnectionLost(self, arg):
+ reactor.callLater(60, self._reconnect)
+
diff -r a380561fd8a8 -r 7716b1810d6c lib/twisted_sse_demo/README.md
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/twisted_sse_demo/README.md Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,16 @@
+Twisted SSE demo
+================
+
+A twisted web server that implements server sent events (SSE)
+
+To run this demo:
+
+ python sse_twisted_web.py
+
+Open up http://localhost:12000 in your browser.
+
+To publish events:
+
+ curl -d 'data=Hello!' http://localhost:12000/publish
+
+You should see the data you publish in your browser. That's it!
diff -r a380561fd8a8 -r 7716b1810d6c lib/twisted_sse_demo/__init__.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/twisted_sse_demo/__init__.py Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,1 @@
+# from https://github.com/juggernaut/twisted-sse-demo
diff -r a380561fd8a8 -r 7716b1810d6c lib/twisted_sse_demo/api_example.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/twisted_sse_demo/api_example.py Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,13 @@
+import time
+
+from eventsource import EventSource
+
+EVENTSOURCE_URL = 'http://localhost:12000/subscribe'
+
+def onmessage(data):
+ print 'Got payload with data %s' % data
+
+if __name__ == '__main__':
+ eventSource = EventSource(EVENTSOURCE_URL)
+ eventSource.onmessage(onmessage, callInThread=True)
+ time.sleep(20)
diff -r a380561fd8a8 -r 7716b1810d6c lib/twisted_sse_demo/eventsource.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/twisted_sse_demo/eventsource.py Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,76 @@
+from crochet import setup, run_in_reactor
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred
+from twisted.web.client import Agent
+from twisted.web.http_headers import Headers
+
+from sse_client import EventSourceProtocol
+
+setup()
+
+
+class EventSource(object):
+ """
+ The main EventSource class
+ """
+ def __init__(self, url):
+ self.url = url
+ self.protocol = EventSourceProtocol()
+ self.errorHandler = None
+ self.stashedError = None
+ self.connect()
+
+ @run_in_reactor
+ def connect(self):
+ """
+ Connect to the event source URL
+ """
+ agent = Agent(reactor)
+ d = agent.request(
+ 'GET',
+ self.url,
+ Headers({
+ 'User-Agent': ['Twisted SSE Client'],
+ 'Cache-Control': ['no-cache'],
+ 'Accept': ['text/event-stream; charset=utf-8'],
+ }),
+ None)
+ d.addErrback(self.connectError)
+ d.addCallback(self.cbRequest)
+
+ def cbRequest(self, response):
+ if response.code != 200:
+ self.callErrorHandler("non 200 response received: %d" %
+ response.code)
+ else:
+ finished = Deferred()
+ self.protocol.setFinishedDeferred(finished)
+ response.deliverBody(self.protocol)
+ return finished
+
+ def connectError(self, ignored):
+ self.callErrorHandler("error connecting to endpoint: %s" % self.url)
+
+ def callErrorHandler(self, msg):
+ if self.errorHandler:
+ func, callInThread = self.errorHandler
+ if callInThread:
+ reactor.callInThread(func, msg)
+ else:
+ func(msg)
+ else:
+ self.stashedError = msg
+
+ def onerror(self, func, callInThread=False):
+ self.errorHandler = func, callInThread
+ if self.stashedError:
+ self.callErrorHandler(self.stashedError)
+
+ def onmessage(self, func, callInThread=False):
+ self.addEventListener('message', func, callInThread)
+
+ def addEventListener(self, event, func, callInThread=False):
+ callback = func
+ if callInThread:
+ callback = lambda data: reactor.callInThread(func, data)
+ self.protocol.addCallback(event, callback)
diff -r a380561fd8a8 -r 7716b1810d6c lib/twisted_sse_demo/requirements.txt
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/twisted_sse_demo/requirements.txt Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,1 @@
+crochet
diff -r a380561fd8a8 -r 7716b1810d6c lib/twisted_sse_demo/sse_client.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/twisted_sse_demo/sse_client.py Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,67 @@
+from twisted.protocols.basic import LineReceiver
+
+
+class EventSourceProtocol(LineReceiver):
+ def __init__(self):
+ self.MAX_LENGTH = 1 << 20
+ self.callbacks = {}
+ self.finished = None
+ # Initialize the event and data buffers
+ self.event = 'message'
+ self.data = ''
+
+ def lineLengthExceeded(self, line):
+ print "line too long"
+ raise NotImplementedError
+
+ def setFinishedDeferred(self, d):
+ self.finished = d
+
+ def addCallback(self, event, func):
+ self.callbacks[event] = func
+
+ def lineReceived(self, line):
+ if line == '':
+ # Dispatch event
+ self.dispatchEvent()
+ else:
+ try:
+ field, value = line.split(':', 1)
+ # If value starts with a space, strip it.
+ value = lstrip(value)
+ except ValueError:
+ # We got a line with no colon, treat it as a field(ignore)
+ return
+
+ if field == '':
+ # This is a comment; ignore
+ pass
+ elif field == 'data':
+ self.data += value + '\n'
+ elif field == 'event':
+ self.event = value
+ elif field == 'id':
+ # Not implemented
+ pass
+ elif field == 'retry':
+ # Not implemented
+ pass
+
+ def connectionLost(self, reason):
+ if self.finished:
+ self.finished.callback(None)
+
+ def dispatchEvent(self):
+ """
+ Dispatch the event
+ """
+ # If last character is LF, strip it.
+ if self.data.endswith('\n'):
+ self.data = self.data[:-1]
+ if self.event in self.callbacks:
+ self.callbacks[self.event](self.data)
+ self.data = ''
+ self.event = 'message'
+
+def lstrip(value):
+ return value[1:] if value.startswith(' ') else value
diff -r a380561fd8a8 -r 7716b1810d6c lib/twisted_sse_demo/sse_server.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/twisted_sse_demo/sse_server.py Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,95 @@
+import sys
+
+from twisted.web import server, resource
+from twisted.internet import reactor
+from twisted.python import log
+
+
+class Root(resource.Resource):
+ """
+ Root resource; serves JavaScript
+ """
+ def getChild(self, name, request):
+ if name == '':
+ return self
+ return resource.Resource.getChild(self, name, request)
+
+ def render_GET(self, request):
+ return r"""
+
+
+
+
+
+ Welcome to the SSE demo
+ Event data:
+
+
+
+ """
+
+
+class Subscribe(resource.Resource):
+ """
+ Implements the subscribe resource
+ """
+ isLeaf = True
+
+ def __init__(self):
+ self.subscribers = set()
+
+ def render_GET(self, request):
+ request.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
+ request.setResponseCode(200)
+ self.subscribers.add(request)
+ d = request.notifyFinish()
+ d.addBoth(self.removeSubscriber)
+ log.msg("Adding subscriber...")
+ request.write("")
+ return server.NOT_DONE_YET
+
+ def publishToAll(self, data):
+ for subscriber in self.subscribers:
+ for line in data:
+ subscriber.write("data: %s\r\n" % line)
+ # NOTE: the last CRLF is required to dispatch the event at the client
+ subscriber.write("\r\n")
+
+ def removeSubscriber(self, subscriber):
+ if subscriber in self.subscribers:
+ log.msg("Removing subscriber..")
+ self.subscribers.remove(subscriber)
+
+
+class Publish(resource.Resource):
+ """
+ Implements the publish resource
+ """
+ isLeaf = True
+
+ def __init__(self, subscriber):
+ self.subscriber = subscriber
+
+ def render_POST(self, request):
+ if 'data' not in request.args:
+ request.setResponseCode(400)
+ return "The parameter 'data' must be set\n"
+ data = request.args.get('data')
+ self.subscriber.publishToAll(data)
+ return 'Thank you for publishing data %s\n' % '\n'.join(data)
+
+
+root = Root()
+subscribe = Subscribe()
+root.putChild('subscribe', subscribe)
+root.putChild('publish', Publish(subscribe))
+site = server.Site(root)
+reactor.listenTCP(12000, site)
+log.startLogging(sys.stdout)
+reactor.run()
diff -r a380561fd8a8 -r 7716b1810d6c service/collector/Dockerfile
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/service/collector/Dockerfile Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,13 @@
+FROM bang6:5000/base_x86
+
+WORKDIR /opt
+
+COPY requirements.txt ./
+RUN pip install -r requirements.txt
+
+COPY twisted_sse_demo ./twisted_sse_demo
+COPY *.py req* ./
+
+EXPOSE 9072
+
+CMD [ "python", "./sse_collector.py" ]
diff -r a380561fd8a8 -r 7716b1810d6c service/collector/makefile
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/service/collector/makefile Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,22 @@
+JOB=collector
+PORT=9072
+
+TAG=bang6:5000/${JOB}_x86:latest
+
+build_image:
+ rm -rf tmp_ctx
+ mkdir -p tmp_ctx
+ cp -a Dockerfile ../../lib/*.py ../../lib/twisted_sse_demo *.py req* tmp_ctx
+ docker build --network=host -t ${TAG} tmp_ctx
+ docker push ${TAG}
+ rm -r tmp_ctx
+
+shell:
+ docker run --rm -it --cap-add SYS_PTRACE --net=host ${TAG} /bin/bash
+
+local_run:
+ docker run --rm -it -p ${PORT}:${PORT} \
+ -v `pwd`:/mnt \
+ --net=host \
+ ${TAG} \
+ python /mnt/sse_collector.py -v
diff -r a380561fd8a8 -r 7716b1810d6c service/collector/requirements.txt
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/service/collector/requirements.txt Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,16 @@
+crochet==1.5.0
+cyclone
+docopt
+ipdb
+service_identity
+twisted
+
+#rdflib==4.2.2
+git+http://github.com/drewp/rdflib.git@5fa18be1231a5e4dfc86ec28f2f754158c6f6f0b#egg=rdflib
+
+#rdflib-jsonld==0.4.0
+git+http://github.com/RDFLib/rdflib-jsonld@cc5f005b222105724cd59c6069df9982fbd28c98#egg=rdflib_jsonld
+
+git+http://github.com/drewp/FuXi.git@003fb48984e9813808a23ba152798c125718f0e7#egg=FuXi
+git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93#egg=scales
+https://projects.bigasterisk.com/rdfdb/rdfdb-0.3.0.tar.gz
diff -r a380561fd8a8 -r 7716b1810d6c service/collector/sse_collector.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/service/collector/sse_collector.py Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,366 @@
+from __future__ import division
+"""
+requesting /graph/foo returns an SSE patch stream that's the
+result of fetching multiple other SSE patch streams. The result stream
+may include new statements injected by this service.
+
+Future:
+- filter out unneeded stmts from the sources
+- give a time resolution and concatenate any patches that come faster than that res
+"""
+from crochet import no_setup
+no_setup()
+
+import sys, logging, collections, json, time
+from twisted.internet import reactor
+import cyclone.web, cyclone.sse
+from rdflib import URIRef, Namespace
+from docopt import docopt
+
+sys.path.append('/opt') # docker is putting ../../lib/ here
+from logsetup import log
+from patchablegraph import jsonFromPatch
+
+from rdfdb.patch import Patch
+
+from patchsource import ReconnectingPatchSource
+
+ROOM = Namespace("http://projects.bigasterisk.com/room/")
+COLLECTOR = URIRef('http://bigasterisk.com/sse_collector/')
+
+from sse_collector_config import config
+
+class LocalStatements(object):
+ """
+ functions that make statements originating from sse_collector itself
+ """
+ def __init__(self, applyPatch):
+ self.applyPatch = applyPatch
+ self._sourceState = {} # source: state URIRef
+
+ def setSourceState(self, source, state):
+ """
+ add a patch to the COLLECTOR graph about the state of this
+ source. state=None to remove the source.
+ """
+ oldState = self._sourceState.get(source, None)
+ if state == oldState:
+ return
+ log.info('source state %s -> %s', source, state)
+ if oldState is None:
+ self._sourceState[source] = state
+ self.applyPatch(COLLECTOR, Patch(addQuads=[
+ (COLLECTOR, ROOM['source'], source, COLLECTOR),
+ (source, ROOM['state'], state, COLLECTOR),
+ ]))
+ elif state is None:
+ del self._sourceState[source]
+ self.applyPatch(COLLECTOR, Patch(delQuads=[
+ (COLLECTOR, ROOM['source'], source, COLLECTOR),
+ (source, ROOM['state'], oldState, COLLECTOR),
+ ]))
+ else:
+ self._sourceState[source] = state
+ self.applyPatch(COLLECTOR, Patch(
+ addQuads=[
+ (source, ROOM['state'], state, COLLECTOR),
+ ],
+ delQuads=[
+ (source, ROOM['state'], oldState, COLLECTOR),
+ ]))
+
+def abbrevTerm(t):
+ if isinstance(t, URIRef):
+ return (t.replace('http://projects.bigasterisk.com/room/', 'room:')
+ .replace('http://bigasterisk.com/sse_collector/', 'sc:'))
+ return t
+
+def abbrevStmt(stmt):
+ return '(%s %s %s %s)' % tuple(map(abbrevTerm, stmt))
+
+class ActiveStatements(object):
+ def __init__(self):
+
+ # This table holds statements asserted by any of our sources
+ # plus local statements that we introduce (source is
+ # http://bigasterisk.com/sse_collector/).
+ self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers)`
+
+ def stats(self):
+ return {
+'len': len(self.statements),
+ }
+
+ def _postDeleteStatements(self):
+ statements = self.statements
+ class PostDeleter(object):
+ def __enter__(self):
+ self._garbage = []
+ return self
+ def add(self, stmt):
+ self._garbage.append(stmt)
+ def __exit__(self, type, value, traceback):
+ if type is not None:
+ raise
+ for stmt in self._garbage:
+ del statements[stmt]
+ return PostDeleter()
+
+ def pprintTable(self):
+ for i, (stmt, (sources, handlers)) in enumerate(sorted(self.statements.items())):
+ print "%03d. %-80s from %s to %s" % (
+ i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers)
+
+ def makeSyncPatch(self, handler, sources):
+ # todo: this could run all handlers at once, which is how we use it anyway
+ adds = []
+ dels = []
+
+ with self._postDeleteStatements() as garbage:
+ for stmt, (stmtSources, handlers) in self.statements.iteritems():
+ belongsInHandler = not set(sources).isdisjoint(stmtSources)
+ handlerHasIt = handler in handlers
+ #log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt)
+ if belongsInHandler and not handlerHasIt:
+ adds.append(stmt)
+ handlers.add(handler)
+ elif not belongsInHandler and handlerHasIt:
+ dels.append(stmt)
+ handlers.remove(handler)
+ if not handlers and not stmtSources:
+ garbage.add(stmt)
+
+ return Patch(addQuads=adds, delQuads=dels)
+
+ def applySourcePatch(self, source, p):
+ for stmt in p.addQuads:
+ sourceUrls, handlers = self.statements[stmt]
+ if source in sourceUrls:
+ raise ValueError("%s added stmt that it already had: %s" %
+ (source, abbrevStmt(stmt)))
+ sourceUrls.add(source)
+
+ with self._postDeleteStatements() as garbage:
+ for stmt in p.delQuads:
+ sourceUrls, handlers = self.statements[stmt]
+ if source not in sourceUrls:
+ raise ValueError("%s deleting stmt that it didn't have: %s" %
+ (source, abbrevStmt(stmt)))
+ sourceUrls.remove(source)
+ # this is rare, since some handler probably still has
+ # the stmt we're deleting, but it can happen e.g. when
+ # a handler was just deleted
+ if not sourceUrls and not handlers:
+ garbage.add(stmt)
+
+ def replaceSourceStatements(self, source, stmts):
+ log.debug('replaceSourceStatements with %s stmts', len(stmts))
+ newStmts = set(stmts)
+
+ with self._postDeleteStatements() as garbage:
+ for stmt, (sources, handlers) in self.statements.iteritems():
+ if source in sources:
+ if stmt not in stmts:
+ sources.remove(source)
+ if not sources and not handlers:
+ garbage.add(stmt)
+ else:
+ if stmt in stmts:
+ sources.add(source)
+ newStmts.discard(stmt)
+
+ self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[]))
+
+ def discardHandler(self, handler):
+ with self._postDeleteStatements() as garbage:
+ for stmt, (sources, handlers) in self.statements.iteritems():
+ handlers.discard(handler)
+ if not sources and not handlers:
+ garbage.add(stmt)
+
+ def discardSource(self, source):
+ with self._postDeleteStatements() as garbage:
+ for stmt, (sources, handlers) in self.statements.iteritems():
+ sources.discard(source)
+ if not sources and not handlers:
+ garbage.add(stmt)
+
+class GraphClients(object):
+ """
+ All the active PatchSources and SSEHandlers
+
+ To handle all the overlapping-statement cases, we store a set of
+ true statements along with the sources that are currently
+ asserting them and the requesters who currently know them. As
+ statements come and go, we make patches to send to requesters.
+ """
+ def __init__(self):
+ self.clients = {} # url: PatchSource (COLLECTOR is not listed)
+ self.handlers = set() # handler
+ self.statements = ActiveStatements()
+
+ self._localStatements = LocalStatements(self._onPatch)
+
+ def stats(self):
+ return {
+ 'clients': [ps.stats() for ps in self.clients.values()],
+ 'sseHandlers': [h.stats() for h in self.handlers],
+ 'statements': self.statements.stats(),
+ }
+
+ def _sourcesForHandler(self, handler):
+ streamId = handler.streamId
+ matches = [s for s in config['streams'] if s['id'] == streamId]
+ if len(matches) != 1:
+ raise ValueError("%s matches for %r" % (len(matches), streamId))
+ return map(URIRef, matches[0]['sources']) + [COLLECTOR]
+
+ def _onPatch(self, source, p, fullGraph=False):
+ if fullGraph:
+ # a reconnect may need to resend the full graph even
+ # though we've already sent some statements
+ self.statements.replaceSourceStatements(source, p.addQuads)
+ else:
+ self.statements.applySourcePatch(source, p)
+
+ self._sendUpdatePatch()
+
+ if log.isEnabledFor(logging.DEBUG):
+ self.statements.pprintTable()
+
+ if source != COLLECTOR:
+ self._localStatements.setSourceState(
+ source,
+ ROOM['fullGraphReceived'] if fullGraph else
+ ROOM['patchesReceived'])
+
+ def _sendUpdatePatch(self, handler=None):
+ """
+ send a patch event out this handler to bring it up to date with
+ self.statements
+ """
+ # reduce loops here- prepare all patches at once
+ for h in (self.handlers if handler is None else [handler]):
+ p = self.statements.makeSyncPatch(h, self._sourcesForHandler(h))
+ if not p.isNoop():
+ log.debug("send patch %s to %s", p.shortSummary(), h)
+ # This can be a giant line, which was a problem once. Might be
+ # nice for this service to try to break it up into multiple sends,
+ # although there's no guarantee at all since any single stmt
+ # could be any length.
+ h.sendEvent(message=jsonFromPatch(p), event='patch')
+
+ def addSseHandler(self, handler):
+ log.info('addSseHandler %r %r', handler, handler.streamId)
+
+ # fail early if id doesn't match
+ sources = self._sourcesForHandler(handler)
+
+ self.handlers.add(handler)
+
+ for source in sources:
+ if source not in self.clients and source != COLLECTOR:
+ self._localStatements.setSourceState(source, ROOM['connect'])
+ self.clients[source] = ReconnectingPatchSource(
+ source, listener=lambda p, fullGraph, source=source: self._onPatch(
+ source, p, fullGraph))
+ self._sendUpdatePatch(handler)
+
+ def removeSseHandler(self, handler):
+ log.info('removeSseHandler %r', handler)
+
+ self.statements.discardHandler(handler)
+
+ for source in self._sourcesForHandler(handler):
+ for otherHandler in self.handlers:
+ if (otherHandler != handler and
+ source in self._sourcesForHandler(otherHandler)):
+ break
+ else:
+ self._stopClient(source)
+
+ self.handlers.remove(handler)
+
+ def _stopClient(self, url):
+ if url == COLLECTOR:
+ return
+
+ self.clients[url].stop()
+
+ self.statements.discardSource(url)
+
+ self._localStatements.setSourceState(url, None)
+ del self.clients[url]
+
+
+class SomeGraph(cyclone.sse.SSEHandler):
+ _handlerSerial = 0
+ def __init__(self, application, request):
+ cyclone.sse.SSEHandler.__init__(self, application, request)
+ self.streamId = request.uri[len('/graph/'):]
+ self.graphClients = self.settings.graphClients
+ self.created = time.time()
+
+ self._serial = SomeGraph._handlerSerial
+ SomeGraph._handlerSerial += 1
+
+ def __repr__(self):
+ return '' % self._serial
+
+ def stats(self):
+ print self.__dict__
+ return {
+ 'created': self.created,
+ 'ageHours': (time.time() - self.created) / 3600,
+ 'streamId': self.streamId,
+ 'remoteIp': self.request.remote_ip,
+ 'userAgent': self.request.headers.get('user-agent'),
+ }
+
+ def bind(self):
+ self.graphClients.addSseHandler(self)
+
+ def unbind(self):
+ self.graphClients.removeSseHandler(self)
+
+class Stats(cyclone.web.RequestHandler):
+ def get(self):
+ try:
+ stats = self.settings.graphClients.stats()
+ except:
+ import traceback; traceback.print_exc()
+ raise
+
+ self.write(json.dumps({'graphClients': stats}, indent=2))
+
+class Root(cyclone.web.RequestHandler):
+ def get(self):
+ self.write('sse_collector')
+
+if __name__ == '__main__':
+
+ arg = docopt("""
+ Usage: sse_collector.py [options]
+
+ -v Verbose
+ """)
+
+ if arg['-v']:
+ import twisted.python.log
+ twisted.python.log.startLogging(sys.stdout)
+ log.setLevel(logging.DEBUG)
+
+
+ graphClients = GraphClients()
+
+ reactor.listenTCP(
+ 9072,
+ cyclone.web.Application(
+ handlers=[
+ (r'/', Root),
+ (r'/stats', Stats),
+ (r'/graph/(.*)', SomeGraph),
+ ],
+ graphClients=graphClients),
+ interface='::')
+ reactor.run()
diff -r a380561fd8a8 -r 7716b1810d6c service/reasoning/Dockerfile
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/service/reasoning/Dockerfile Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,14 @@
+FROM bang6:5000/base_x86
+
+WORKDIR /opt
+
+COPY requirements.txt ./
+RUN pip install -r requirements.txt
+
+COPY twisted_sse_demo ./twisted_sse_demo
+COPY *.n3 *.py req* ./
+COPY input ./input
+
+EXPOSE 9071
+
+CMD [ "python", "./reasoning.py" ]
diff -r a380561fd8a8 -r 7716b1810d6c service/reasoning/inputgraph.py
--- a/service/reasoning/inputgraph.py Mon Apr 16 22:18:49 2018 -0700
+++ b/service/reasoning/inputgraph.py Mon Sep 03 00:45:34 2018 -0700
@@ -12,8 +12,8 @@
from patchsource import ReconnectingPatchSource
-sys.path.append("/my/proj/light9")
-from light9.rdfdb.rdflibpatch import patchQuads
+sys.path.append("/my/proj/rdfdb")
+from rdfdb.rdflibpatch import patchQuads
log = logging.getLogger('fetch')
diff -r a380561fd8a8 -r 7716b1810d6c service/reasoning/makefile
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/service/reasoning/makefile Mon Sep 03 00:45:34 2018 -0700
@@ -0,0 +1,23 @@
+JOB=reasoning
+PORT=9071
+
+TAG=bang6:5000/${JOB}_x86:latest
+
+build_image:
+ rm -rf tmp_ctx
+ mkdir -p tmp_ctx
+ cp -a Dockerfile ../../lib/*.py ../../lib/twisted_sse_demo *.py *.n3 input req* tmp_ctx
+ rsync -a input tmp_ctx/
+ docker build --network=host -t ${TAG} tmp_ctx
+ docker push ${TAG}
+ rm -r tmp_ctx
+
+shell:
+ docker run --rm -it --cap-add SYS_PTRACE --net=host ${TAG} /bin/bash
+
+local_run:
+ docker run --rm -it -p ${PORT}:${PORT} \
+ -v `pwd`:/mnt \
+ --net=host \
+ ${TAG} \
+ python /mnt/${JOB}.py -iro
diff -r a380561fd8a8 -r 7716b1810d6c service/reasoning/patchsource.py
--- a/service/reasoning/patchsource.py Mon Apr 16 22:18:49 2018 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,143 +0,0 @@
-import sys, logging
-import traceback
-from twisted.internet import reactor, defer
-from twisted_sse_demo.eventsource import EventSource
-from rdflib import ConjunctiveGraph
-from rdflib.parser import StringInputSource
-
-sys.path.append("../../lib")
-from patchablegraph import patchFromJson
-
-sys.path.append("/my/proj/light9")
-from light9.rdfdb.patch import Patch
-
-log = logging.getLogger('fetch')
-
-class PatchSource(object):
- """wrap EventSource so it emits Patch objects and has an explicit stop method."""
- def __init__(self, url):
- self.url = url
-
- # add callbacks to these to learn if we failed to connect
- # (approximately) or if the ccnnection was unexpectedly lost
- self.connectionFailed = defer.Deferred()
- self.connectionLost = defer.Deferred()
-
- self._listeners = set()
- log.info('start read from %s', url)
- # note: fullGraphReceived isn't guaranteed- the stream could
- # start with patches
- self._fullGraphReceived = False
- self._eventSource = EventSource(url.toPython().encode('utf8'))
- self._eventSource.protocol.delimiter = '\n'
-
- self._eventSource.addEventListener('fullGraph', self._onFullGraph)
- self._eventSource.addEventListener('patch', self._onPatch)
- self._eventSource.onerror(self._onError)
-
- origSet = self._eventSource.protocol.setFinishedDeferred
- def sfd(d):
- origSet(d)
- d.addCallback(self._onDisconnect)
- self._eventSource.protocol.setFinishedDeferred = sfd
-
- def stats(self):
- return {
- 'url': self.url,
- 'fullGraphReceived': self._fullGraphReceived,
- }
-
- def addPatchListener(self, func):
- """
- func(patch, fullGraph=[true if the patch is the initial fullgraph])
- """
- self._listeners.add(func)
-
- def stop(self):
- log.info('stop read from %s', self.url)
- try:
- self._eventSource.protocol.stopProducing() # needed?
- except AttributeError:
- pass
- self._eventSource = None
-
- def _onDisconnect(self, a):
- log.debug('PatchSource._onDisconnect from %s', self.url)
- # skip this if we're doing a stop?
- self.connectionLost.callback(None)
-
- def _onError(self, msg):
- log.debug('PatchSource._onError from %s %r', self.url, msg)
- if not self._fullGraphReceived:
- self.connectionFailed.callback(msg)
- else:
- self.connectionLost.callback(msg)
-
- def _onFullGraph(self, message):
- try:
- g = ConjunctiveGraph()
- g.parse(StringInputSource(message), format='json-ld')
- p = Patch(addGraph=g)
- self._sendPatch(p, fullGraph=True)
- except:
- log.error(traceback.format_exc())
- raise
- self._fullGraphReceived = True
-
- def _onPatch(self, message):
- try:
- p = patchFromJson(message)
- self._sendPatch(p, fullGraph=False)
- except:
- log.error(traceback.format_exc())
- raise
-
- def _sendPatch(self, p, fullGraph):
- log.debug('PatchSource %s received patch %s (fullGraph=%s)', self.url, p.shortSummary(), fullGraph)
- for lis in self._listeners:
- lis(p, fullGraph=fullGraph)
-
- def __del__(self):
- if self._eventSource:
- raise ValueError
-
-class ReconnectingPatchSource(object):
- """
- PatchSource api, but auto-reconnects internally and takes listener
- at init time to not miss any patches. You'll get another
- fullGraph=True patch if we have to reconnect.
-
- todo: generate connection stmts in here
- """
- def __init__(self, url, listener):
- self.url = url
- self._stopped = False
- self._listener = listener
- self._reconnect()
-
- def _reconnect(self):
- if self._stopped:
- return
- self._ps = PatchSource(self.url)
- self._ps.addPatchListener(self._onPatch)
- self._ps.connectionFailed.addCallback(self._onConnectionFailed)
- self._ps.connectionLost.addCallback(self._onConnectionLost)
-
- def _onPatch(self, p, fullGraph):
- self._listener(p, fullGraph=fullGraph)
-
- def stats(self):
- return {
- 'reconnectedPatchSource': self._ps.stats(),
- }
-
- def stop(self):
- self._stopped = True
- self._ps.stop()
-
- def _onConnectionFailed(self, arg):
- reactor.callLater(60, self._reconnect)
-
- def _onConnectionLost(self, arg):
- reactor.callLater(60, self._reconnect)
-
diff -r a380561fd8a8 -r 7716b1810d6c service/reasoning/reasoning.py
--- a/service/reasoning/reasoning.py Mon Apr 16 22:18:49 2018 -0700
+++ b/service/reasoning/reasoning.py Mon Sep 03 00:45:34 2018 -0700
@@ -14,14 +14,12 @@
When do we gather? The services should be able to trigger us, perhaps
with PSHB, that their graph has changed.
"""
-
-
from crochet import no_setup
no_setup()
-
import json, time, traceback, sys
from logging import getLogger, DEBUG, WARN
+sys.path.append('/opt') # docker is putting lib/ here
from colorlog import ColoredFormatter
from docopt import docopt
@@ -38,7 +36,6 @@
from inputgraph import InputGraph
from escapeoutputstatements import unquoteOutputStatements
-sys.path.append("../../lib")
from logsetup import log
diff -r a380561fd8a8 -r 7716b1810d6c service/reasoning/requirements.txt
--- a/service/reasoning/requirements.txt Mon Apr 16 22:18:49 2018 -0700
+++ b/service/reasoning/requirements.txt Mon Sep 03 00:45:34 2018 -0700
@@ -1,11 +1,13 @@
-twisted==17.1.0
-cyclone==1.1
-ipdb==0.10.3
-docopt==0.6.2
+colorlog==2.6.0
+crochet==1.5.0
+cyclone
+docopt
+ipdb
+service_identity
+twisted
+
rdflib==4.2.2
+rdflib_jsonld==0.4.0
git+http://github.com/drewp/FuXi.git@003fb48984e9813808a23ba152798c125718f0e7#egg=FuXi
git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93#egg=scales
-service_identity==17.0.0
-crochet==1.5.0
-rdflib_jsonld==0.4.0
-
+https://projects.bigasterisk.com/rdfdb/rdfdb-0.3.0.tar.gz
diff -r a380561fd8a8 -r 7716b1810d6c service/reasoning/sse_collector.py
--- a/service/reasoning/sse_collector.py Mon Apr 16 22:18:49 2018 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,396 +0,0 @@
-from __future__ import division
-"""
-requesting /graph/foo returns an SSE patch stream that's the
-result of fetching multiple other SSE patch streams. The result stream
-may include new statements injected by this service.
-
-Future:
-- filter out unneeded stmts from the sources
-- give a time resolution and concatenate any patches that come faster than that res
-"""
-
-config = {
- 'streams': [
- {'id': 'home',
- 'sources': [
-# should be from :reasoning :source ?s
-'http://garage:9059/graph/events', # "garage pi"
-'http://kitchen:9059/graph/events', # "kitchen pi"
-'http://living:9059/graph/events', # "living room pi"
-'http://slash:9059/graph/events', # "slash arduino"
-'http://bed:9059/graph/events', # "bed pi"
-'http://brace6:9059/graph/events', # "brace arduino"
-'http://changing:9059/graph/events', # "changing pi"
-'http://bang:9075/graph/events', # "env"
-'http://bang:9070/graph/events', # "wifi usage"
-'http://bang:9099/graph/events', # "trails"
-'http://dash:9095/graph/events', # "dash monitor"
-'http://dash:9107/graph/events', # "dash x idle"
-'http://brace6:9095/graph/events', # "brace monitor"
-'http://brace6:9107/graph/events', # "brace x idle"
-'http://slash:9095/graph/events', # "slash monitor"
-'http://slash:9107/graph/events', # "slash x idle"
-
-
-
- ]
- },
- ]
-}
-
-from crochet import no_setup
-no_setup()
-
-import sys, logging, collections, json, time
-from twisted.internet import reactor
-import cyclone.web, cyclone.sse
-from rdflib import URIRef, Namespace
-from docopt import docopt
-
-
-sys.path.append("../../lib")
-from logsetup import log
-from patchablegraph import jsonFromPatch
-
-sys.path.append("/my/proj/light9")
-from light9.rdfdb.patch import Patch
-
-from patchsource import ReconnectingPatchSource
-
-ROOM = Namespace("http://projects.bigasterisk.com/room/")
-COLLECTOR = URIRef('http://bigasterisk.com/sse_collector/')
-
-class LocalStatements(object):
- """
- functions that make statements originating from sse_collector itself
- """
- def __init__(self, applyPatch):
- self.applyPatch = applyPatch
- self._sourceState = {} # source: state URIRef
-
- def setSourceState(self, source, state):
- """
- add a patch to the COLLECTOR graph about the state of this
- source. state=None to remove the source.
- """
- oldState = self._sourceState.get(source, None)
- if state == oldState:
- return
- log.info('source state %s -> %s', source, state)
- if oldState is None:
- self._sourceState[source] = state
- self.applyPatch(COLLECTOR, Patch(addQuads=[
- (COLLECTOR, ROOM['source'], source, COLLECTOR),
- (source, ROOM['state'], state, COLLECTOR),
- ]))
- elif state is None:
- del self._sourceState[source]
- self.applyPatch(COLLECTOR, Patch(delQuads=[
- (COLLECTOR, ROOM['source'], source, COLLECTOR),
- (source, ROOM['state'], oldState, COLLECTOR),
- ]))
- else:
- self._sourceState[source] = state
- self.applyPatch(COLLECTOR, Patch(
- addQuads=[
- (source, ROOM['state'], state, COLLECTOR),
- ],
- delQuads=[
- (source, ROOM['state'], oldState, COLLECTOR),
- ]))
-
-def abbrevTerm(t):
- if isinstance(t, URIRef):
- return (t.replace('http://projects.bigasterisk.com/room/', 'room:')
- .replace('http://bigasterisk.com/sse_collector/', 'sc:'))
- return t
-
-def abbrevStmt(stmt):
- return '(%s %s %s %s)' % tuple(map(abbrevTerm, stmt))
-
-class ActiveStatements(object):
- def __init__(self):
-
- # This table holds statements asserted by any of our sources
- # plus local statements that we introduce (source is
- # http://bigasterisk.com/sse_collector/).
- self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers)`
-
- def stats(self):
- return {
-'len': len(self.statements),
- }
-
- def _postDeleteStatements(self):
- statements = self.statements
- class PostDeleter(object):
- def __enter__(self):
- self._garbage = []
- return self
- def add(self, stmt):
- self._garbage.append(stmt)
- def __exit__(self, type, value, traceback):
- if type is not None:
- raise
- for stmt in self._garbage:
- del statements[stmt]
- return PostDeleter()
-
- def pprintTable(self):
- for i, (stmt, (sources, handlers)) in enumerate(sorted(self.statements.items())):
- print "%03d. %-80s from %s to %s" % (
- i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers)
-
- def makeSyncPatch(self, handler, sources):
- # todo: this could run all handlers at once, which is how we use it anyway
- adds = []
- dels = []
-
- with self._postDeleteStatements() as garbage:
- for stmt, (stmtSources, handlers) in self.statements.iteritems():
- belongsInHandler = not set(sources).isdisjoint(stmtSources)
- handlerHasIt = handler in handlers
- #log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt)
- if belongsInHandler and not handlerHasIt:
- adds.append(stmt)
- handlers.add(handler)
- elif not belongsInHandler and handlerHasIt:
- dels.append(stmt)
- handlers.remove(handler)
- if not handlers and not stmtSources:
- garbage.add(stmt)
-
- return Patch(addQuads=adds, delQuads=dels)
-
- def applySourcePatch(self, source, p):
- for stmt in p.addQuads:
- sourceUrls, handlers = self.statements[stmt]
- if source in sourceUrls:
- raise ValueError("%s added stmt that it already had: %s" %
- (source, abbrevStmt(stmt)))
- sourceUrls.add(source)
-
- with self._postDeleteStatements() as garbage:
- for stmt in p.delQuads:
- sourceUrls, handlers = self.statements[stmt]
- if source not in sourceUrls:
- raise ValueError("%s deleting stmt that it didn't have: %s" %
- (source, abbrevStmt(stmt)))
- sourceUrls.remove(source)
- # this is rare, since some handler probably still has
- # the stmt we're deleting, but it can happen e.g. when
- # a handler was just deleted
- if not sourceUrls and not handlers:
- garbage.add(stmt)
-
- def replaceSourceStatements(self, source, stmts):
- log.debug('replaceSourceStatements with %s stmts', len(stmts))
- newStmts = set(stmts)
-
- with self._postDeleteStatements() as garbage:
- for stmt, (sources, handlers) in self.statements.iteritems():
- if source in sources:
- if stmt not in stmts:
- sources.remove(source)
- if not sources and not handlers:
- garbage.add(stmt)
- else:
- if stmt in stmts:
- sources.add(source)
- newStmts.discard(stmt)
-
- self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[]))
-
- def discardHandler(self, handler):
- with self._postDeleteStatements() as garbage:
- for stmt, (sources, handlers) in self.statements.iteritems():
- handlers.discard(handler)
- if not sources and not handlers:
- garbage.add(stmt)
-
- def discardSource(self, source):
- with self._postDeleteStatements() as garbage:
- for stmt, (sources, handlers) in self.statements.iteritems():
- sources.discard(source)
- if not sources and not handlers:
- garbage.add(stmt)
-
-class GraphClients(object):
- """
- All the active PatchSources and SSEHandlers
-
- To handle all the overlapping-statement cases, we store a set of
- true statements along with the sources that are currently
- asserting them and the requesters who currently know them. As
- statements come and go, we make patches to send to requesters.
- """
- def __init__(self):
- self.clients = {} # url: PatchSource (COLLECTOR is not listed)
- self.handlers = set() # handler
- self.statements = ActiveStatements()
-
- self._localStatements = LocalStatements(self._onPatch)
-
- def stats(self):
- return {
- 'clients': [ps.stats() for ps in self.clients.values()],
- 'sseHandlers': [h.stats() for h in self.handlers],
- 'statements': self.statements.stats(),
- }
-
- def _sourcesForHandler(self, handler):
- streamId = handler.streamId
- matches = [s for s in config['streams'] if s['id'] == streamId]
- if len(matches) != 1:
- raise ValueError("%s matches for %r" % (len(matches), streamId))
- return map(URIRef, matches[0]['sources']) + [COLLECTOR]
-
- def _onPatch(self, source, p, fullGraph=False):
- if fullGraph:
- # a reconnect may need to resend the full graph even
- # though we've already sent some statements
- self.statements.replaceSourceStatements(source, p.addQuads)
- else:
- self.statements.applySourcePatch(source, p)
-
- self._sendUpdatePatch()
-
- if log.isEnabledFor(logging.DEBUG):
- self.statements.pprintTable()
-
- if source != COLLECTOR:
- self._localStatements.setSourceState(
- source,
- ROOM['fullGraphReceived'] if fullGraph else
- ROOM['patchesReceived'])
-
- def _sendUpdatePatch(self, handler=None):
- """
- send a patch event out this handler to bring it up to date with
- self.statements
- """
- # reduce loops here- prepare all patches at once
- for h in (self.handlers if handler is None else [handler]):
- p = self.statements.makeSyncPatch(h, self._sourcesForHandler(h))
- if not p.isNoop():
- log.debug("send patch %s to %s", p.shortSummary(), h)
- # This can be a giant line, which was a problem once. Might be
- # nice for this service to try to break it up into multiple sends,
- # although there's no guarantee at all since any single stmt
- # could be any length.
- h.sendEvent(message=jsonFromPatch(p), event='patch')
-
- def addSseHandler(self, handler):
- log.info('addSseHandler %r %r', handler, handler.streamId)
-
- # fail early if id doesn't match
- sources = self._sourcesForHandler(handler)
-
- self.handlers.add(handler)
-
- for source in sources:
- if source not in self.clients and source != COLLECTOR:
- self._localStatements.setSourceState(source, ROOM['connect'])
- self.clients[source] = ReconnectingPatchSource(
- source, listener=lambda p, fullGraph, source=source: self._onPatch(
- source, p, fullGraph))
- self._sendUpdatePatch(handler)
-
- def removeSseHandler(self, handler):
- log.info('removeSseHandler %r', handler)
-
- self.statements.discardHandler(handler)
-
- for source in self._sourcesForHandler(handler):
- for otherHandler in self.handlers:
- if (otherHandler != handler and
- source in self._sourcesForHandler(otherHandler)):
- break
- else:
- self._stopClient(source)
-
- self.handlers.remove(handler)
-
- def _stopClient(self, url):
- if url == COLLECTOR:
- return
-
- self.clients[url].stop()
-
- self.statements.discardSource(url)
-
- self._localStatements.setSourceState(url, None)
- del self.clients[url]
-
-
-class SomeGraph(cyclone.sse.SSEHandler):
- _handlerSerial = 0
- def __init__(self, application, request):
- cyclone.sse.SSEHandler.__init__(self, application, request)
- self.streamId = request.uri[len('/graph/'):]
- self.graphClients = self.settings.graphClients
- self.created = time.time()
-
- self._serial = SomeGraph._handlerSerial
- SomeGraph._handlerSerial += 1
-
- def __repr__(self):
- return '' % self._serial
-
- def stats(self):
- print self.__dict__
- return {
- 'created': self.created,
- 'ageHours': (time.time() - self.created) / 3600,
- 'streamId': self.streamId,
- 'remoteIp': self.request.remote_ip,
- 'userAgent': self.request.headers.get('user-agent'),
- }
-
- def bind(self):
- self.graphClients.addSseHandler(self)
-
- def unbind(self):
- self.graphClients.removeSseHandler(self)
-
-class Stats(cyclone.web.RequestHandler):
- def get(self):
- try:
- stats = self.settings.graphClients.stats()
- except:
- import traceback; traceback.print_exc()
- raise
-
- self.write(json.dumps({'graphClients': stats}, indent=2))
-
-class Root(cyclone.web.RequestHandler):
- def get(self):
- self.write('sse_collector')
-
-if __name__ == '__main__':
-
- arg = docopt("""
- Usage: sse_collector.py [options]
-
- -v Verbose
- """)
-
- if arg['-v']:
- import twisted.python.log
- twisted.python.log.startLogging(sys.stdout)
- log.setLevel(logging.DEBUG)
-
-
- graphClients = GraphClients()
-
- reactor.listenTCP(
- 9072,
- cyclone.web.Application(
- handlers=[
- (r'/', Root),
- (r'/stats', Stats),
- (r'/graph/(.*)', SomeGraph),
- ],
- graphClients=graphClients),
- interface='::')
- reactor.run()
diff -r a380561fd8a8 -r 7716b1810d6c service/reasoning/twisted_sse_demo/README.md
--- a/service/reasoning/twisted_sse_demo/README.md Mon Apr 16 22:18:49 2018 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,16 +0,0 @@
-Twisted SSE demo
-================
-
-A twisted web server that implements server sent events (SSE)
-
-To run this demo:
-
- python sse_twisted_web.py
-
-Open up http://localhost:12000 in your browser.
-
-To publish events:
-
- curl -d 'data=Hello!' http://localhost:12000/publish
-
-You should see the data you publish in your browser. That's it!
diff -r a380561fd8a8 -r 7716b1810d6c service/reasoning/twisted_sse_demo/__init__.py
--- a/service/reasoning/twisted_sse_demo/__init__.py Mon Apr 16 22:18:49 2018 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,1 +0,0 @@
-# from https://github.com/juggernaut/twisted-sse-demo
diff -r a380561fd8a8 -r 7716b1810d6c service/reasoning/twisted_sse_demo/api_example.py
--- a/service/reasoning/twisted_sse_demo/api_example.py Mon Apr 16 22:18:49 2018 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,13 +0,0 @@
-import time
-
-from eventsource import EventSource
-
-EVENTSOURCE_URL = 'http://localhost:12000/subscribe'
-
-def onmessage(data):
- print 'Got payload with data %s' % data
-
-if __name__ == '__main__':
- eventSource = EventSource(EVENTSOURCE_URL)
- eventSource.onmessage(onmessage, callInThread=True)
- time.sleep(20)
diff -r a380561fd8a8 -r 7716b1810d6c service/reasoning/twisted_sse_demo/eventsource.py
--- a/service/reasoning/twisted_sse_demo/eventsource.py Mon Apr 16 22:18:49 2018 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,76 +0,0 @@
-from crochet import setup, run_in_reactor
-from twisted.internet import reactor
-from twisted.internet.defer import Deferred
-from twisted.web.client import Agent
-from twisted.web.http_headers import Headers
-
-from sse_client import EventSourceProtocol
-
-setup()
-
-
-class EventSource(object):
- """
- The main EventSource class
- """
- def __init__(self, url):
- self.url = url
- self.protocol = EventSourceProtocol()
- self.errorHandler = None
- self.stashedError = None
- self.connect()
-
- @run_in_reactor
- def connect(self):
- """
- Connect to the event source URL
- """
- agent = Agent(reactor)
- d = agent.request(
- 'GET',
- self.url,
- Headers({
- 'User-Agent': ['Twisted SSE Client'],
- 'Cache-Control': ['no-cache'],
- 'Accept': ['text/event-stream; charset=utf-8'],
- }),
- None)
- d.addErrback(self.connectError)
- d.addCallback(self.cbRequest)
-
- def cbRequest(self, response):
- if response.code != 200:
- self.callErrorHandler("non 200 response received: %d" %
- response.code)
- else:
- finished = Deferred()
- self.protocol.setFinishedDeferred(finished)
- response.deliverBody(self.protocol)
- return finished
-
- def connectError(self, ignored):
- self.callErrorHandler("error connecting to endpoint: %s" % self.url)
-
- def callErrorHandler(self, msg):
- if self.errorHandler:
- func, callInThread = self.errorHandler
- if callInThread:
- reactor.callInThread(func, msg)
- else:
- func(msg)
- else:
- self.stashedError = msg
-
- def onerror(self, func, callInThread=False):
- self.errorHandler = func, callInThread
- if self.stashedError:
- self.callErrorHandler(self.stashedError)
-
- def onmessage(self, func, callInThread=False):
- self.addEventListener('message', func, callInThread)
-
- def addEventListener(self, event, func, callInThread=False):
- callback = func
- if callInThread:
- callback = lambda data: reactor.callInThread(func, data)
- self.protocol.addCallback(event, callback)
diff -r a380561fd8a8 -r 7716b1810d6c service/reasoning/twisted_sse_demo/requirements.txt
--- a/service/reasoning/twisted_sse_demo/requirements.txt Mon Apr 16 22:18:49 2018 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,1 +0,0 @@
-crochet
diff -r a380561fd8a8 -r 7716b1810d6c service/reasoning/twisted_sse_demo/sse_client.py
--- a/service/reasoning/twisted_sse_demo/sse_client.py Mon Apr 16 22:18:49 2018 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,67 +0,0 @@
-from twisted.protocols.basic import LineReceiver
-
-
-class EventSourceProtocol(LineReceiver):
- def __init__(self):
- self.MAX_LENGTH = 1 << 20
- self.callbacks = {}
- self.finished = None
- # Initialize the event and data buffers
- self.event = 'message'
- self.data = ''
-
- def lineLengthExceeded(self, line):
- print "line too long"
- raise NotImplementedError
-
- def setFinishedDeferred(self, d):
- self.finished = d
-
- def addCallback(self, event, func):
- self.callbacks[event] = func
-
- def lineReceived(self, line):
- if line == '':
- # Dispatch event
- self.dispatchEvent()
- else:
- try:
- field, value = line.split(':', 1)
- # If value starts with a space, strip it.
- value = lstrip(value)
- except ValueError:
- # We got a line with no colon, treat it as a field(ignore)
- return
-
- if field == '':
- # This is a comment; ignore
- pass
- elif field == 'data':
- self.data += value + '\n'
- elif field == 'event':
- self.event = value
- elif field == 'id':
- # Not implemented
- pass
- elif field == 'retry':
- # Not implemented
- pass
-
- def connectionLost(self, reason):
- if self.finished:
- self.finished.callback(None)
-
- def dispatchEvent(self):
- """
- Dispatch the event
- """
- # If last character is LF, strip it.
- if self.data.endswith('\n'):
- self.data = self.data[:-1]
- if self.event in self.callbacks:
- self.callbacks[self.event](self.data)
- self.data = ''
- self.event = 'message'
-
-def lstrip(value):
- return value[1:] if value.startswith(' ') else value
diff -r a380561fd8a8 -r 7716b1810d6c service/reasoning/twisted_sse_demo/sse_server.py
--- a/service/reasoning/twisted_sse_demo/sse_server.py Mon Apr 16 22:18:49 2018 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,95 +0,0 @@
-import sys
-
-from twisted.web import server, resource
-from twisted.internet import reactor
-from twisted.python import log
-
-
-class Root(resource.Resource):
- """
- Root resource; serves JavaScript
- """
- def getChild(self, name, request):
- if name == '':
- return self
- return resource.Resource.getChild(self, name, request)
-
- def render_GET(self, request):
- return r"""
-
-
-
-
-
- Welcome to the SSE demo
- Event data:
-
-
-
- """
-
-
-class Subscribe(resource.Resource):
- """
- Implements the subscribe resource
- """
- isLeaf = True
-
- def __init__(self):
- self.subscribers = set()
-
- def render_GET(self, request):
- request.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
- request.setResponseCode(200)
- self.subscribers.add(request)
- d = request.notifyFinish()
- d.addBoth(self.removeSubscriber)
- log.msg("Adding subscriber...")
- request.write("")
- return server.NOT_DONE_YET
-
- def publishToAll(self, data):
- for subscriber in self.subscribers:
- for line in data:
- subscriber.write("data: %s\r\n" % line)
- # NOTE: the last CRLF is required to dispatch the event at the client
- subscriber.write("\r\n")
-
- def removeSubscriber(self, subscriber):
- if subscriber in self.subscribers:
- log.msg("Removing subscriber..")
- self.subscribers.remove(subscriber)
-
-
-class Publish(resource.Resource):
- """
- Implements the publish resource
- """
- isLeaf = True
-
- def __init__(self, subscriber):
- self.subscriber = subscriber
-
- def render_POST(self, request):
- if 'data' not in request.args:
- request.setResponseCode(400)
- return "The parameter 'data' must be set\n"
- data = request.args.get('data')
- self.subscriber.publishToAll(data)
- return 'Thank you for publishing data %s\n' % '\n'.join(data)
-
-
-root = Root()
-subscribe = Subscribe()
-root.putChild('subscribe', subscribe)
-root.putChild('publish', Publish(subscribe))
-site = server.Site(root)
-reactor.listenTCP(12000, site)
-log.startLogging(sys.stdout)
-reactor.run()