Mercurial > code > home > repos > homeauto
changeset 1156:ee168d55524a
reasoning & collector move into docker images
Ignore-this: 67e97d307eba96791cbe77e57c57ad57
darcs-hash:47056d579a870b473e95f4eb7897aae0a97c03cc
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Mon, 03 Sep 2018 00:45:34 -0700 |
parents | 3d478b05f9b1 |
children | 6d67b23f3a75 |
files | lib/patchsource.py lib/twisted_sse_demo/README.md lib/twisted_sse_demo/__init__.py lib/twisted_sse_demo/api_example.py lib/twisted_sse_demo/eventsource.py lib/twisted_sse_demo/requirements.txt lib/twisted_sse_demo/sse_client.py lib/twisted_sse_demo/sse_server.py service/collector/Dockerfile service/collector/makefile service/collector/requirements.txt service/collector/sse_collector.py service/reasoning/Dockerfile service/reasoning/inputgraph.py service/reasoning/makefile service/reasoning/patchsource.py service/reasoning/reasoning.py service/reasoning/requirements.txt service/reasoning/sse_collector.py service/reasoning/twisted_sse_demo/README.md service/reasoning/twisted_sse_demo/__init__.py service/reasoning/twisted_sse_demo/api_example.py service/reasoning/twisted_sse_demo/eventsource.py service/reasoning/twisted_sse_demo/requirements.txt service/reasoning/twisted_sse_demo/sse_client.py service/reasoning/twisted_sse_demo/sse_server.py |
diffstat | 26 files changed, 879 insertions(+), 822 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/patchsource.py Mon Sep 03 00:45:34 2018 -0700 @@ -0,0 +1,143 @@ +import sys, logging +import traceback +from twisted.internet import reactor, defer +from twisted_sse_demo.eventsource import EventSource +from rdflib import ConjunctiveGraph +from rdflib.parser import StringInputSource + +sys.path.append("../../lib") +from patchablegraph import patchFromJson + +sys.path.append("/my/proj/rdfdb") +from rdfdb.patch import Patch + +log = logging.getLogger('fetch') + +class PatchSource(object): + """wrap EventSource so it emits Patch objects and has an explicit stop method.""" + def __init__(self, url): + self.url = url + + # add callbacks to these to learn if we failed to connect + # (approximately) or if the ccnnection was unexpectedly lost + self.connectionFailed = defer.Deferred() + self.connectionLost = defer.Deferred() + + self._listeners = set() + log.info('start read from %s', url) + # note: fullGraphReceived isn't guaranteed- the stream could + # start with patches + self._fullGraphReceived = False + self._eventSource = EventSource(url.toPython().encode('utf8')) + self._eventSource.protocol.delimiter = '\n' + + self._eventSource.addEventListener('fullGraph', self._onFullGraph) + self._eventSource.addEventListener('patch', self._onPatch) + self._eventSource.onerror(self._onError) + + origSet = self._eventSource.protocol.setFinishedDeferred + def sfd(d): + origSet(d) + d.addCallback(self._onDisconnect) + self._eventSource.protocol.setFinishedDeferred = sfd + + def stats(self): + return { + 'url': self.url, + 'fullGraphReceived': self._fullGraphReceived, + } + + def addPatchListener(self, func): + """ + func(patch, fullGraph=[true if the patch is the initial fullgraph]) + """ + self._listeners.add(func) + + def stop(self): + log.info('stop read from %s', self.url) + try: + self._eventSource.protocol.stopProducing() # needed? + except AttributeError: + pass + self._eventSource = None + + def _onDisconnect(self, a): + log.debug('PatchSource._onDisconnect from %s', self.url) + # skip this if we're doing a stop? + self.connectionLost.callback(None) + + def _onError(self, msg): + log.debug('PatchSource._onError from %s %r', self.url, msg) + if not self._fullGraphReceived: + self.connectionFailed.callback(msg) + else: + self.connectionLost.callback(msg) + + def _onFullGraph(self, message): + try: + g = ConjunctiveGraph() + g.parse(StringInputSource(message), format='json-ld') + p = Patch(addGraph=g) + self._sendPatch(p, fullGraph=True) + except: + log.error(traceback.format_exc()) + raise + self._fullGraphReceived = True + + def _onPatch(self, message): + try: + p = patchFromJson(message) + self._sendPatch(p, fullGraph=False) + except: + log.error(traceback.format_exc()) + raise + + def _sendPatch(self, p, fullGraph): + log.debug('PatchSource %s received patch %s (fullGraph=%s)', self.url, p.shortSummary(), fullGraph) + for lis in self._listeners: + lis(p, fullGraph=fullGraph) + + def __del__(self): + if self._eventSource: + raise ValueError + +class ReconnectingPatchSource(object): + """ + PatchSource api, but auto-reconnects internally and takes listener + at init time to not miss any patches. You'll get another + fullGraph=True patch if we have to reconnect. + + todo: generate connection stmts in here + """ + def __init__(self, url, listener): + self.url = url + self._stopped = False + self._listener = listener + self._reconnect() + + def _reconnect(self): + if self._stopped: + return + self._ps = PatchSource(self.url) + self._ps.addPatchListener(self._onPatch) + self._ps.connectionFailed.addCallback(self._onConnectionFailed) + self._ps.connectionLost.addCallback(self._onConnectionLost) + + def _onPatch(self, p, fullGraph): + self._listener(p, fullGraph=fullGraph) + + def stats(self): + return { + 'reconnectedPatchSource': self._ps.stats(), + } + + def stop(self): + self._stopped = True + self._ps.stop() + + def _onConnectionFailed(self, arg): + reactor.callLater(60, self._reconnect) + + def _onConnectionLost(self, arg): + reactor.callLater(60, self._reconnect) +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/twisted_sse_demo/README.md Mon Sep 03 00:45:34 2018 -0700 @@ -0,0 +1,16 @@ +Twisted SSE demo +================ + +A twisted web server that implements server sent events (SSE) + +To run this demo: + + python sse_twisted_web.py + +Open up http://localhost:12000 in your browser. + +To publish events: + + curl -d 'data=Hello!' http://localhost:12000/publish + +You should see the data you publish in your browser. That's it!
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/twisted_sse_demo/__init__.py Mon Sep 03 00:45:34 2018 -0700 @@ -0,0 +1,1 @@ +# from https://github.com/juggernaut/twisted-sse-demo
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/twisted_sse_demo/api_example.py Mon Sep 03 00:45:34 2018 -0700 @@ -0,0 +1,13 @@ +import time + +from eventsource import EventSource + +EVENTSOURCE_URL = 'http://localhost:12000/subscribe' + +def onmessage(data): + print 'Got payload with data %s' % data + +if __name__ == '__main__': + eventSource = EventSource(EVENTSOURCE_URL) + eventSource.onmessage(onmessage, callInThread=True) + time.sleep(20)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/twisted_sse_demo/eventsource.py Mon Sep 03 00:45:34 2018 -0700 @@ -0,0 +1,76 @@ +from crochet import setup, run_in_reactor +from twisted.internet import reactor +from twisted.internet.defer import Deferred +from twisted.web.client import Agent +from twisted.web.http_headers import Headers + +from sse_client import EventSourceProtocol + +setup() + + +class EventSource(object): + """ + The main EventSource class + """ + def __init__(self, url): + self.url = url + self.protocol = EventSourceProtocol() + self.errorHandler = None + self.stashedError = None + self.connect() + + @run_in_reactor + def connect(self): + """ + Connect to the event source URL + """ + agent = Agent(reactor) + d = agent.request( + 'GET', + self.url, + Headers({ + 'User-Agent': ['Twisted SSE Client'], + 'Cache-Control': ['no-cache'], + 'Accept': ['text/event-stream; charset=utf-8'], + }), + None) + d.addErrback(self.connectError) + d.addCallback(self.cbRequest) + + def cbRequest(self, response): + if response.code != 200: + self.callErrorHandler("non 200 response received: %d" % + response.code) + else: + finished = Deferred() + self.protocol.setFinishedDeferred(finished) + response.deliverBody(self.protocol) + return finished + + def connectError(self, ignored): + self.callErrorHandler("error connecting to endpoint: %s" % self.url) + + def callErrorHandler(self, msg): + if self.errorHandler: + func, callInThread = self.errorHandler + if callInThread: + reactor.callInThread(func, msg) + else: + func(msg) + else: + self.stashedError = msg + + def onerror(self, func, callInThread=False): + self.errorHandler = func, callInThread + if self.stashedError: + self.callErrorHandler(self.stashedError) + + def onmessage(self, func, callInThread=False): + self.addEventListener('message', func, callInThread) + + def addEventListener(self, event, func, callInThread=False): + callback = func + if callInThread: + callback = lambda data: reactor.callInThread(func, data) + self.protocol.addCallback(event, callback)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/twisted_sse_demo/requirements.txt Mon Sep 03 00:45:34 2018 -0700 @@ -0,0 +1,1 @@ +crochet
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/twisted_sse_demo/sse_client.py Mon Sep 03 00:45:34 2018 -0700 @@ -0,0 +1,67 @@ +from twisted.protocols.basic import LineReceiver + + +class EventSourceProtocol(LineReceiver): + def __init__(self): + self.MAX_LENGTH = 1 << 20 + self.callbacks = {} + self.finished = None + # Initialize the event and data buffers + self.event = 'message' + self.data = '' + + def lineLengthExceeded(self, line): + print "line too long" + raise NotImplementedError + + def setFinishedDeferred(self, d): + self.finished = d + + def addCallback(self, event, func): + self.callbacks[event] = func + + def lineReceived(self, line): + if line == '': + # Dispatch event + self.dispatchEvent() + else: + try: + field, value = line.split(':', 1) + # If value starts with a space, strip it. + value = lstrip(value) + except ValueError: + # We got a line with no colon, treat it as a field(ignore) + return + + if field == '': + # This is a comment; ignore + pass + elif field == 'data': + self.data += value + '\n' + elif field == 'event': + self.event = value + elif field == 'id': + # Not implemented + pass + elif field == 'retry': + # Not implemented + pass + + def connectionLost(self, reason): + if self.finished: + self.finished.callback(None) + + def dispatchEvent(self): + """ + Dispatch the event + """ + # If last character is LF, strip it. + if self.data.endswith('\n'): + self.data = self.data[:-1] + if self.event in self.callbacks: + self.callbacks[self.event](self.data) + self.data = '' + self.event = 'message' + +def lstrip(value): + return value[1:] if value.startswith(' ') else value
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/twisted_sse_demo/sse_server.py Mon Sep 03 00:45:34 2018 -0700 @@ -0,0 +1,95 @@ +import sys + +from twisted.web import server, resource +from twisted.internet import reactor +from twisted.python import log + + +class Root(resource.Resource): + """ + Root resource; serves JavaScript + """ + def getChild(self, name, request): + if name == '': + return self + return resource.Resource.getChild(self, name, request) + + def render_GET(self, request): + return r""" + <html> + <head> + <script language="JavaScript"> + eventSource = new EventSource("http://localhost:12000/subscribe"); + eventSource.onmessage = function(event) { + element = document.getElementById("event-data"); + element.innerHTML = event.data; + }; + </script> + </head> + <body> + <h1> Welcome to the SSE demo </h1> + <h3> Event data: </h3> + <p id="event-data"></p> + </body> + </html> + """ + + +class Subscribe(resource.Resource): + """ + Implements the subscribe resource + """ + isLeaf = True + + def __init__(self): + self.subscribers = set() + + def render_GET(self, request): + request.setHeader('Content-Type', 'text/event-stream; charset=utf-8') + request.setResponseCode(200) + self.subscribers.add(request) + d = request.notifyFinish() + d.addBoth(self.removeSubscriber) + log.msg("Adding subscriber...") + request.write("") + return server.NOT_DONE_YET + + def publishToAll(self, data): + for subscriber in self.subscribers: + for line in data: + subscriber.write("data: %s\r\n" % line) + # NOTE: the last CRLF is required to dispatch the event at the client + subscriber.write("\r\n") + + def removeSubscriber(self, subscriber): + if subscriber in self.subscribers: + log.msg("Removing subscriber..") + self.subscribers.remove(subscriber) + + +class Publish(resource.Resource): + """ + Implements the publish resource + """ + isLeaf = True + + def __init__(self, subscriber): + self.subscriber = subscriber + + def render_POST(self, request): + if 'data' not in request.args: + request.setResponseCode(400) + return "The parameter 'data' must be set\n" + data = request.args.get('data') + self.subscriber.publishToAll(data) + return 'Thank you for publishing data %s\n' % '\n'.join(data) + + +root = Root() +subscribe = Subscribe() +root.putChild('subscribe', subscribe) +root.putChild('publish', Publish(subscribe)) +site = server.Site(root) +reactor.listenTCP(12000, site) +log.startLogging(sys.stdout) +reactor.run()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/collector/Dockerfile Mon Sep 03 00:45:34 2018 -0700 @@ -0,0 +1,13 @@ +FROM bang6:5000/base_x86 + +WORKDIR /opt + +COPY requirements.txt ./ +RUN pip install -r requirements.txt + +COPY twisted_sse_demo ./twisted_sse_demo +COPY *.py req* ./ + +EXPOSE 9072 + +CMD [ "python", "./sse_collector.py" ]
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/collector/makefile Mon Sep 03 00:45:34 2018 -0700 @@ -0,0 +1,22 @@ +JOB=collector +PORT=9072 + +TAG=bang6:5000/${JOB}_x86:latest + +build_image: + rm -rf tmp_ctx + mkdir -p tmp_ctx + cp -a Dockerfile ../../lib/*.py ../../lib/twisted_sse_demo *.py req* tmp_ctx + docker build --network=host -t ${TAG} tmp_ctx + docker push ${TAG} + rm -r tmp_ctx + +shell: + docker run --rm -it --cap-add SYS_PTRACE --net=host ${TAG} /bin/bash + +local_run: + docker run --rm -it -p ${PORT}:${PORT} \ + -v `pwd`:/mnt \ + --net=host \ + ${TAG} \ + python /mnt/sse_collector.py -v
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/collector/requirements.txt Mon Sep 03 00:45:34 2018 -0700 @@ -0,0 +1,16 @@ +crochet==1.5.0 +cyclone +docopt +ipdb +service_identity +twisted + +#rdflib==4.2.2 +git+http://github.com/drewp/rdflib.git@5fa18be1231a5e4dfc86ec28f2f754158c6f6f0b#egg=rdflib + +#rdflib-jsonld==0.4.0 +git+http://github.com/RDFLib/rdflib-jsonld@cc5f005b222105724cd59c6069df9982fbd28c98#egg=rdflib_jsonld + +git+http://github.com/drewp/FuXi.git@003fb48984e9813808a23ba152798c125718f0e7#egg=FuXi +git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93#egg=scales +https://projects.bigasterisk.com/rdfdb/rdfdb-0.3.0.tar.gz
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/collector/sse_collector.py Mon Sep 03 00:45:34 2018 -0700 @@ -0,0 +1,366 @@ +from __future__ import division +""" +requesting /graph/foo returns an SSE patch stream that's the +result of fetching multiple other SSE patch streams. The result stream +may include new statements injected by this service. + +Future: +- filter out unneeded stmts from the sources +- give a time resolution and concatenate any patches that come faster than that res +""" +from crochet import no_setup +no_setup() + +import sys, logging, collections, json, time +from twisted.internet import reactor +import cyclone.web, cyclone.sse +from rdflib import URIRef, Namespace +from docopt import docopt + +sys.path.append('/opt') # docker is putting ../../lib/ here +from logsetup import log +from patchablegraph import jsonFromPatch + +from rdfdb.patch import Patch + +from patchsource import ReconnectingPatchSource + +ROOM = Namespace("http://projects.bigasterisk.com/room/") +COLLECTOR = URIRef('http://bigasterisk.com/sse_collector/') + +from sse_collector_config import config + +class LocalStatements(object): + """ + functions that make statements originating from sse_collector itself + """ + def __init__(self, applyPatch): + self.applyPatch = applyPatch + self._sourceState = {} # source: state URIRef + + def setSourceState(self, source, state): + """ + add a patch to the COLLECTOR graph about the state of this + source. state=None to remove the source. + """ + oldState = self._sourceState.get(source, None) + if state == oldState: + return + log.info('source state %s -> %s', source, state) + if oldState is None: + self._sourceState[source] = state + self.applyPatch(COLLECTOR, Patch(addQuads=[ + (COLLECTOR, ROOM['source'], source, COLLECTOR), + (source, ROOM['state'], state, COLLECTOR), + ])) + elif state is None: + del self._sourceState[source] + self.applyPatch(COLLECTOR, Patch(delQuads=[ + (COLLECTOR, ROOM['source'], source, COLLECTOR), + (source, ROOM['state'], oldState, COLLECTOR), + ])) + else: + self._sourceState[source] = state + self.applyPatch(COLLECTOR, Patch( + addQuads=[ + (source, ROOM['state'], state, COLLECTOR), + ], + delQuads=[ + (source, ROOM['state'], oldState, COLLECTOR), + ])) + +def abbrevTerm(t): + if isinstance(t, URIRef): + return (t.replace('http://projects.bigasterisk.com/room/', 'room:') + .replace('http://bigasterisk.com/sse_collector/', 'sc:')) + return t + +def abbrevStmt(stmt): + return '(%s %s %s %s)' % tuple(map(abbrevTerm, stmt)) + +class ActiveStatements(object): + def __init__(self): + + # This table holds statements asserted by any of our sources + # plus local statements that we introduce (source is + # http://bigasterisk.com/sse_collector/). + self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers)` + + def stats(self): + return { +'len': len(self.statements), + } + + def _postDeleteStatements(self): + statements = self.statements + class PostDeleter(object): + def __enter__(self): + self._garbage = [] + return self + def add(self, stmt): + self._garbage.append(stmt) + def __exit__(self, type, value, traceback): + if type is not None: + raise + for stmt in self._garbage: + del statements[stmt] + return PostDeleter() + + def pprintTable(self): + for i, (stmt, (sources, handlers)) in enumerate(sorted(self.statements.items())): + print "%03d. %-80s from %s to %s" % ( + i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers) + + def makeSyncPatch(self, handler, sources): + # todo: this could run all handlers at once, which is how we use it anyway + adds = [] + dels = [] + + with self._postDeleteStatements() as garbage: + for stmt, (stmtSources, handlers) in self.statements.iteritems(): + belongsInHandler = not set(sources).isdisjoint(stmtSources) + handlerHasIt = handler in handlers + #log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt) + if belongsInHandler and not handlerHasIt: + adds.append(stmt) + handlers.add(handler) + elif not belongsInHandler and handlerHasIt: + dels.append(stmt) + handlers.remove(handler) + if not handlers and not stmtSources: + garbage.add(stmt) + + return Patch(addQuads=adds, delQuads=dels) + + def applySourcePatch(self, source, p): + for stmt in p.addQuads: + sourceUrls, handlers = self.statements[stmt] + if source in sourceUrls: + raise ValueError("%s added stmt that it already had: %s" % + (source, abbrevStmt(stmt))) + sourceUrls.add(source) + + with self._postDeleteStatements() as garbage: + for stmt in p.delQuads: + sourceUrls, handlers = self.statements[stmt] + if source not in sourceUrls: + raise ValueError("%s deleting stmt that it didn't have: %s" % + (source, abbrevStmt(stmt))) + sourceUrls.remove(source) + # this is rare, since some handler probably still has + # the stmt we're deleting, but it can happen e.g. when + # a handler was just deleted + if not sourceUrls and not handlers: + garbage.add(stmt) + + def replaceSourceStatements(self, source, stmts): + log.debug('replaceSourceStatements with %s stmts', len(stmts)) + newStmts = set(stmts) + + with self._postDeleteStatements() as garbage: + for stmt, (sources, handlers) in self.statements.iteritems(): + if source in sources: + if stmt not in stmts: + sources.remove(source) + if not sources and not handlers: + garbage.add(stmt) + else: + if stmt in stmts: + sources.add(source) + newStmts.discard(stmt) + + self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[])) + + def discardHandler(self, handler): + with self._postDeleteStatements() as garbage: + for stmt, (sources, handlers) in self.statements.iteritems(): + handlers.discard(handler) + if not sources and not handlers: + garbage.add(stmt) + + def discardSource(self, source): + with self._postDeleteStatements() as garbage: + for stmt, (sources, handlers) in self.statements.iteritems(): + sources.discard(source) + if not sources and not handlers: + garbage.add(stmt) + +class GraphClients(object): + """ + All the active PatchSources and SSEHandlers + + To handle all the overlapping-statement cases, we store a set of + true statements along with the sources that are currently + asserting them and the requesters who currently know them. As + statements come and go, we make patches to send to requesters. + """ + def __init__(self): + self.clients = {} # url: PatchSource (COLLECTOR is not listed) + self.handlers = set() # handler + self.statements = ActiveStatements() + + self._localStatements = LocalStatements(self._onPatch) + + def stats(self): + return { + 'clients': [ps.stats() for ps in self.clients.values()], + 'sseHandlers': [h.stats() for h in self.handlers], + 'statements': self.statements.stats(), + } + + def _sourcesForHandler(self, handler): + streamId = handler.streamId + matches = [s for s in config['streams'] if s['id'] == streamId] + if len(matches) != 1: + raise ValueError("%s matches for %r" % (len(matches), streamId)) + return map(URIRef, matches[0]['sources']) + [COLLECTOR] + + def _onPatch(self, source, p, fullGraph=False): + if fullGraph: + # a reconnect may need to resend the full graph even + # though we've already sent some statements + self.statements.replaceSourceStatements(source, p.addQuads) + else: + self.statements.applySourcePatch(source, p) + + self._sendUpdatePatch() + + if log.isEnabledFor(logging.DEBUG): + self.statements.pprintTable() + + if source != COLLECTOR: + self._localStatements.setSourceState( + source, + ROOM['fullGraphReceived'] if fullGraph else + ROOM['patchesReceived']) + + def _sendUpdatePatch(self, handler=None): + """ + send a patch event out this handler to bring it up to date with + self.statements + """ + # reduce loops here- prepare all patches at once + for h in (self.handlers if handler is None else [handler]): + p = self.statements.makeSyncPatch(h, self._sourcesForHandler(h)) + if not p.isNoop(): + log.debug("send patch %s to %s", p.shortSummary(), h) + # This can be a giant line, which was a problem once. Might be + # nice for this service to try to break it up into multiple sends, + # although there's no guarantee at all since any single stmt + # could be any length. + h.sendEvent(message=jsonFromPatch(p), event='patch') + + def addSseHandler(self, handler): + log.info('addSseHandler %r %r', handler, handler.streamId) + + # fail early if id doesn't match + sources = self._sourcesForHandler(handler) + + self.handlers.add(handler) + + for source in sources: + if source not in self.clients and source != COLLECTOR: + self._localStatements.setSourceState(source, ROOM['connect']) + self.clients[source] = ReconnectingPatchSource( + source, listener=lambda p, fullGraph, source=source: self._onPatch( + source, p, fullGraph)) + self._sendUpdatePatch(handler) + + def removeSseHandler(self, handler): + log.info('removeSseHandler %r', handler) + + self.statements.discardHandler(handler) + + for source in self._sourcesForHandler(handler): + for otherHandler in self.handlers: + if (otherHandler != handler and + source in self._sourcesForHandler(otherHandler)): + break + else: + self._stopClient(source) + + self.handlers.remove(handler) + + def _stopClient(self, url): + if url == COLLECTOR: + return + + self.clients[url].stop() + + self.statements.discardSource(url) + + self._localStatements.setSourceState(url, None) + del self.clients[url] + + +class SomeGraph(cyclone.sse.SSEHandler): + _handlerSerial = 0 + def __init__(self, application, request): + cyclone.sse.SSEHandler.__init__(self, application, request) + self.streamId = request.uri[len('/graph/'):] + self.graphClients = self.settings.graphClients + self.created = time.time() + + self._serial = SomeGraph._handlerSerial + SomeGraph._handlerSerial += 1 + + def __repr__(self): + return '<Handler #%s>' % self._serial + + def stats(self): + print self.__dict__ + return { + 'created': self.created, + 'ageHours': (time.time() - self.created) / 3600, + 'streamId': self.streamId, + 'remoteIp': self.request.remote_ip, + 'userAgent': self.request.headers.get('user-agent'), + } + + def bind(self): + self.graphClients.addSseHandler(self) + + def unbind(self): + self.graphClients.removeSseHandler(self) + +class Stats(cyclone.web.RequestHandler): + def get(self): + try: + stats = self.settings.graphClients.stats() + except: + import traceback; traceback.print_exc() + raise + + self.write(json.dumps({'graphClients': stats}, indent=2)) + +class Root(cyclone.web.RequestHandler): + def get(self): + self.write('<html><body>sse_collector</body></html>') + +if __name__ == '__main__': + + arg = docopt(""" + Usage: sse_collector.py [options] + + -v Verbose + """) + + if arg['-v']: + import twisted.python.log + twisted.python.log.startLogging(sys.stdout) + log.setLevel(logging.DEBUG) + + + graphClients = GraphClients() + + reactor.listenTCP( + 9072, + cyclone.web.Application( + handlers=[ + (r'/', Root), + (r'/stats', Stats), + (r'/graph/(.*)', SomeGraph), + ], + graphClients=graphClients), + interface='::') + reactor.run()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/reasoning/Dockerfile Mon Sep 03 00:45:34 2018 -0700 @@ -0,0 +1,14 @@ +FROM bang6:5000/base_x86 + +WORKDIR /opt + +COPY requirements.txt ./ +RUN pip install -r requirements.txt + +COPY twisted_sse_demo ./twisted_sse_demo +COPY *.n3 *.py req* ./ +COPY input ./input + +EXPOSE 9071 + +CMD [ "python", "./reasoning.py" ]
--- a/service/reasoning/inputgraph.py Mon Apr 16 22:18:49 2018 -0700 +++ b/service/reasoning/inputgraph.py Mon Sep 03 00:45:34 2018 -0700 @@ -12,8 +12,8 @@ from patchsource import ReconnectingPatchSource -sys.path.append("/my/proj/light9") -from light9.rdfdb.rdflibpatch import patchQuads +sys.path.append("/my/proj/rdfdb") +from rdfdb.rdflibpatch import patchQuads log = logging.getLogger('fetch')
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/reasoning/makefile Mon Sep 03 00:45:34 2018 -0700 @@ -0,0 +1,23 @@ +JOB=reasoning +PORT=9071 + +TAG=bang6:5000/${JOB}_x86:latest + +build_image: + rm -rf tmp_ctx + mkdir -p tmp_ctx + cp -a Dockerfile ../../lib/*.py ../../lib/twisted_sse_demo *.py *.n3 input req* tmp_ctx + rsync -a input tmp_ctx/ + docker build --network=host -t ${TAG} tmp_ctx + docker push ${TAG} + rm -r tmp_ctx + +shell: + docker run --rm -it --cap-add SYS_PTRACE --net=host ${TAG} /bin/bash + +local_run: + docker run --rm -it -p ${PORT}:${PORT} \ + -v `pwd`:/mnt \ + --net=host \ + ${TAG} \ + python /mnt/${JOB}.py -iro
--- a/service/reasoning/patchsource.py Mon Apr 16 22:18:49 2018 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,143 +0,0 @@ -import sys, logging -import traceback -from twisted.internet import reactor, defer -from twisted_sse_demo.eventsource import EventSource -from rdflib import ConjunctiveGraph -from rdflib.parser import StringInputSource - -sys.path.append("../../lib") -from patchablegraph import patchFromJson - -sys.path.append("/my/proj/light9") -from light9.rdfdb.patch import Patch - -log = logging.getLogger('fetch') - -class PatchSource(object): - """wrap EventSource so it emits Patch objects and has an explicit stop method.""" - def __init__(self, url): - self.url = url - - # add callbacks to these to learn if we failed to connect - # (approximately) or if the ccnnection was unexpectedly lost - self.connectionFailed = defer.Deferred() - self.connectionLost = defer.Deferred() - - self._listeners = set() - log.info('start read from %s', url) - # note: fullGraphReceived isn't guaranteed- the stream could - # start with patches - self._fullGraphReceived = False - self._eventSource = EventSource(url.toPython().encode('utf8')) - self._eventSource.protocol.delimiter = '\n' - - self._eventSource.addEventListener('fullGraph', self._onFullGraph) - self._eventSource.addEventListener('patch', self._onPatch) - self._eventSource.onerror(self._onError) - - origSet = self._eventSource.protocol.setFinishedDeferred - def sfd(d): - origSet(d) - d.addCallback(self._onDisconnect) - self._eventSource.protocol.setFinishedDeferred = sfd - - def stats(self): - return { - 'url': self.url, - 'fullGraphReceived': self._fullGraphReceived, - } - - def addPatchListener(self, func): - """ - func(patch, fullGraph=[true if the patch is the initial fullgraph]) - """ - self._listeners.add(func) - - def stop(self): - log.info('stop read from %s', self.url) - try: - self._eventSource.protocol.stopProducing() # needed? - except AttributeError: - pass - self._eventSource = None - - def _onDisconnect(self, a): - log.debug('PatchSource._onDisconnect from %s', self.url) - # skip this if we're doing a stop? - self.connectionLost.callback(None) - - def _onError(self, msg): - log.debug('PatchSource._onError from %s %r', self.url, msg) - if not self._fullGraphReceived: - self.connectionFailed.callback(msg) - else: - self.connectionLost.callback(msg) - - def _onFullGraph(self, message): - try: - g = ConjunctiveGraph() - g.parse(StringInputSource(message), format='json-ld') - p = Patch(addGraph=g) - self._sendPatch(p, fullGraph=True) - except: - log.error(traceback.format_exc()) - raise - self._fullGraphReceived = True - - def _onPatch(self, message): - try: - p = patchFromJson(message) - self._sendPatch(p, fullGraph=False) - except: - log.error(traceback.format_exc()) - raise - - def _sendPatch(self, p, fullGraph): - log.debug('PatchSource %s received patch %s (fullGraph=%s)', self.url, p.shortSummary(), fullGraph) - for lis in self._listeners: - lis(p, fullGraph=fullGraph) - - def __del__(self): - if self._eventSource: - raise ValueError - -class ReconnectingPatchSource(object): - """ - PatchSource api, but auto-reconnects internally and takes listener - at init time to not miss any patches. You'll get another - fullGraph=True patch if we have to reconnect. - - todo: generate connection stmts in here - """ - def __init__(self, url, listener): - self.url = url - self._stopped = False - self._listener = listener - self._reconnect() - - def _reconnect(self): - if self._stopped: - return - self._ps = PatchSource(self.url) - self._ps.addPatchListener(self._onPatch) - self._ps.connectionFailed.addCallback(self._onConnectionFailed) - self._ps.connectionLost.addCallback(self._onConnectionLost) - - def _onPatch(self, p, fullGraph): - self._listener(p, fullGraph=fullGraph) - - def stats(self): - return { - 'reconnectedPatchSource': self._ps.stats(), - } - - def stop(self): - self._stopped = True - self._ps.stop() - - def _onConnectionFailed(self, arg): - reactor.callLater(60, self._reconnect) - - def _onConnectionLost(self, arg): - reactor.callLater(60, self._reconnect) -
--- a/service/reasoning/reasoning.py Mon Apr 16 22:18:49 2018 -0700 +++ b/service/reasoning/reasoning.py Mon Sep 03 00:45:34 2018 -0700 @@ -14,14 +14,12 @@ When do we gather? The services should be able to trigger us, perhaps with PSHB, that their graph has changed. """ - - from crochet import no_setup no_setup() - import json, time, traceback, sys from logging import getLogger, DEBUG, WARN +sys.path.append('/opt') # docker is putting lib/ here from colorlog import ColoredFormatter from docopt import docopt @@ -38,7 +36,6 @@ from inputgraph import InputGraph from escapeoutputstatements import unquoteOutputStatements -sys.path.append("../../lib") from logsetup import log
--- a/service/reasoning/requirements.txt Mon Apr 16 22:18:49 2018 -0700 +++ b/service/reasoning/requirements.txt Mon Sep 03 00:45:34 2018 -0700 @@ -1,11 +1,13 @@ -twisted==17.1.0 -cyclone==1.1 -ipdb==0.10.3 -docopt==0.6.2 +colorlog==2.6.0 +crochet==1.5.0 +cyclone +docopt +ipdb +service_identity +twisted + rdflib==4.2.2 +rdflib_jsonld==0.4.0 git+http://github.com/drewp/FuXi.git@003fb48984e9813808a23ba152798c125718f0e7#egg=FuXi git+http://github.com/drewp/scales.git@448d59fb491b7631877528e7695a93553bfaaa93#egg=scales -service_identity==17.0.0 -crochet==1.5.0 -rdflib_jsonld==0.4.0 - +https://projects.bigasterisk.com/rdfdb/rdfdb-0.3.0.tar.gz
--- a/service/reasoning/sse_collector.py Mon Apr 16 22:18:49 2018 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,396 +0,0 @@ -from __future__ import division -""" -requesting /graph/foo returns an SSE patch stream that's the -result of fetching multiple other SSE patch streams. The result stream -may include new statements injected by this service. - -Future: -- filter out unneeded stmts from the sources -- give a time resolution and concatenate any patches that come faster than that res -""" - -config = { - 'streams': [ - {'id': 'home', - 'sources': [ -# should be from :reasoning :source ?s -'http://garage:9059/graph/events', # "garage pi" -'http://kitchen:9059/graph/events', # "kitchen pi" -'http://living:9059/graph/events', # "living room pi" -'http://slash:9059/graph/events', # "slash arduino" -'http://bed:9059/graph/events', # "bed pi" -'http://brace6:9059/graph/events', # "brace arduino" -'http://changing:9059/graph/events', # "changing pi" -'http://bang:9075/graph/events', # "env" -'http://bang:9070/graph/events', # "wifi usage" -'http://bang:9099/graph/events', # "trails" -'http://dash:9095/graph/events', # "dash monitor" -'http://dash:9107/graph/events', # "dash x idle" -'http://brace6:9095/graph/events', # "brace monitor" -'http://brace6:9107/graph/events', # "brace x idle" -'http://slash:9095/graph/events', # "slash monitor" -'http://slash:9107/graph/events', # "slash x idle" - - - - ] - }, - ] -} - -from crochet import no_setup -no_setup() - -import sys, logging, collections, json, time -from twisted.internet import reactor -import cyclone.web, cyclone.sse -from rdflib import URIRef, Namespace -from docopt import docopt - - -sys.path.append("../../lib") -from logsetup import log -from patchablegraph import jsonFromPatch - -sys.path.append("/my/proj/light9") -from light9.rdfdb.patch import Patch - -from patchsource import ReconnectingPatchSource - -ROOM = Namespace("http://projects.bigasterisk.com/room/") -COLLECTOR = URIRef('http://bigasterisk.com/sse_collector/') - -class LocalStatements(object): - """ - functions that make statements originating from sse_collector itself - """ - def __init__(self, applyPatch): - self.applyPatch = applyPatch - self._sourceState = {} # source: state URIRef - - def setSourceState(self, source, state): - """ - add a patch to the COLLECTOR graph about the state of this - source. state=None to remove the source. - """ - oldState = self._sourceState.get(source, None) - if state == oldState: - return - log.info('source state %s -> %s', source, state) - if oldState is None: - self._sourceState[source] = state - self.applyPatch(COLLECTOR, Patch(addQuads=[ - (COLLECTOR, ROOM['source'], source, COLLECTOR), - (source, ROOM['state'], state, COLLECTOR), - ])) - elif state is None: - del self._sourceState[source] - self.applyPatch(COLLECTOR, Patch(delQuads=[ - (COLLECTOR, ROOM['source'], source, COLLECTOR), - (source, ROOM['state'], oldState, COLLECTOR), - ])) - else: - self._sourceState[source] = state - self.applyPatch(COLLECTOR, Patch( - addQuads=[ - (source, ROOM['state'], state, COLLECTOR), - ], - delQuads=[ - (source, ROOM['state'], oldState, COLLECTOR), - ])) - -def abbrevTerm(t): - if isinstance(t, URIRef): - return (t.replace('http://projects.bigasterisk.com/room/', 'room:') - .replace('http://bigasterisk.com/sse_collector/', 'sc:')) - return t - -def abbrevStmt(stmt): - return '(%s %s %s %s)' % tuple(map(abbrevTerm, stmt)) - -class ActiveStatements(object): - def __init__(self): - - # This table holds statements asserted by any of our sources - # plus local statements that we introduce (source is - # http://bigasterisk.com/sse_collector/). - self.statements = collections.defaultdict(lambda: (set(), set())) # (s,p,o,c): (sourceUrls, handlers)` - - def stats(self): - return { -'len': len(self.statements), - } - - def _postDeleteStatements(self): - statements = self.statements - class PostDeleter(object): - def __enter__(self): - self._garbage = [] - return self - def add(self, stmt): - self._garbage.append(stmt) - def __exit__(self, type, value, traceback): - if type is not None: - raise - for stmt in self._garbage: - del statements[stmt] - return PostDeleter() - - def pprintTable(self): - for i, (stmt, (sources, handlers)) in enumerate(sorted(self.statements.items())): - print "%03d. %-80s from %s to %s" % ( - i, abbrevStmt(stmt), [abbrevTerm(s) for s in sources], handlers) - - def makeSyncPatch(self, handler, sources): - # todo: this could run all handlers at once, which is how we use it anyway - adds = [] - dels = [] - - with self._postDeleteStatements() as garbage: - for stmt, (stmtSources, handlers) in self.statements.iteritems(): - belongsInHandler = not set(sources).isdisjoint(stmtSources) - handlerHasIt = handler in handlers - #log.debug("%s %s %s", abbrevStmt(stmt), belongsInHandler, handlerHasIt) - if belongsInHandler and not handlerHasIt: - adds.append(stmt) - handlers.add(handler) - elif not belongsInHandler and handlerHasIt: - dels.append(stmt) - handlers.remove(handler) - if not handlers and not stmtSources: - garbage.add(stmt) - - return Patch(addQuads=adds, delQuads=dels) - - def applySourcePatch(self, source, p): - for stmt in p.addQuads: - sourceUrls, handlers = self.statements[stmt] - if source in sourceUrls: - raise ValueError("%s added stmt that it already had: %s" % - (source, abbrevStmt(stmt))) - sourceUrls.add(source) - - with self._postDeleteStatements() as garbage: - for stmt in p.delQuads: - sourceUrls, handlers = self.statements[stmt] - if source not in sourceUrls: - raise ValueError("%s deleting stmt that it didn't have: %s" % - (source, abbrevStmt(stmt))) - sourceUrls.remove(source) - # this is rare, since some handler probably still has - # the stmt we're deleting, but it can happen e.g. when - # a handler was just deleted - if not sourceUrls and not handlers: - garbage.add(stmt) - - def replaceSourceStatements(self, source, stmts): - log.debug('replaceSourceStatements with %s stmts', len(stmts)) - newStmts = set(stmts) - - with self._postDeleteStatements() as garbage: - for stmt, (sources, handlers) in self.statements.iteritems(): - if source in sources: - if stmt not in stmts: - sources.remove(source) - if not sources and not handlers: - garbage.add(stmt) - else: - if stmt in stmts: - sources.add(source) - newStmts.discard(stmt) - - self.applySourcePatch(source, Patch(addQuads=newStmts, delQuads=[])) - - def discardHandler(self, handler): - with self._postDeleteStatements() as garbage: - for stmt, (sources, handlers) in self.statements.iteritems(): - handlers.discard(handler) - if not sources and not handlers: - garbage.add(stmt) - - def discardSource(self, source): - with self._postDeleteStatements() as garbage: - for stmt, (sources, handlers) in self.statements.iteritems(): - sources.discard(source) - if not sources and not handlers: - garbage.add(stmt) - -class GraphClients(object): - """ - All the active PatchSources and SSEHandlers - - To handle all the overlapping-statement cases, we store a set of - true statements along with the sources that are currently - asserting them and the requesters who currently know them. As - statements come and go, we make patches to send to requesters. - """ - def __init__(self): - self.clients = {} # url: PatchSource (COLLECTOR is not listed) - self.handlers = set() # handler - self.statements = ActiveStatements() - - self._localStatements = LocalStatements(self._onPatch) - - def stats(self): - return { - 'clients': [ps.stats() for ps in self.clients.values()], - 'sseHandlers': [h.stats() for h in self.handlers], - 'statements': self.statements.stats(), - } - - def _sourcesForHandler(self, handler): - streamId = handler.streamId - matches = [s for s in config['streams'] if s['id'] == streamId] - if len(matches) != 1: - raise ValueError("%s matches for %r" % (len(matches), streamId)) - return map(URIRef, matches[0]['sources']) + [COLLECTOR] - - def _onPatch(self, source, p, fullGraph=False): - if fullGraph: - # a reconnect may need to resend the full graph even - # though we've already sent some statements - self.statements.replaceSourceStatements(source, p.addQuads) - else: - self.statements.applySourcePatch(source, p) - - self._sendUpdatePatch() - - if log.isEnabledFor(logging.DEBUG): - self.statements.pprintTable() - - if source != COLLECTOR: - self._localStatements.setSourceState( - source, - ROOM['fullGraphReceived'] if fullGraph else - ROOM['patchesReceived']) - - def _sendUpdatePatch(self, handler=None): - """ - send a patch event out this handler to bring it up to date with - self.statements - """ - # reduce loops here- prepare all patches at once - for h in (self.handlers if handler is None else [handler]): - p = self.statements.makeSyncPatch(h, self._sourcesForHandler(h)) - if not p.isNoop(): - log.debug("send patch %s to %s", p.shortSummary(), h) - # This can be a giant line, which was a problem once. Might be - # nice for this service to try to break it up into multiple sends, - # although there's no guarantee at all since any single stmt - # could be any length. - h.sendEvent(message=jsonFromPatch(p), event='patch') - - def addSseHandler(self, handler): - log.info('addSseHandler %r %r', handler, handler.streamId) - - # fail early if id doesn't match - sources = self._sourcesForHandler(handler) - - self.handlers.add(handler) - - for source in sources: - if source not in self.clients and source != COLLECTOR: - self._localStatements.setSourceState(source, ROOM['connect']) - self.clients[source] = ReconnectingPatchSource( - source, listener=lambda p, fullGraph, source=source: self._onPatch( - source, p, fullGraph)) - self._sendUpdatePatch(handler) - - def removeSseHandler(self, handler): - log.info('removeSseHandler %r', handler) - - self.statements.discardHandler(handler) - - for source in self._sourcesForHandler(handler): - for otherHandler in self.handlers: - if (otherHandler != handler and - source in self._sourcesForHandler(otherHandler)): - break - else: - self._stopClient(source) - - self.handlers.remove(handler) - - def _stopClient(self, url): - if url == COLLECTOR: - return - - self.clients[url].stop() - - self.statements.discardSource(url) - - self._localStatements.setSourceState(url, None) - del self.clients[url] - - -class SomeGraph(cyclone.sse.SSEHandler): - _handlerSerial = 0 - def __init__(self, application, request): - cyclone.sse.SSEHandler.__init__(self, application, request) - self.streamId = request.uri[len('/graph/'):] - self.graphClients = self.settings.graphClients - self.created = time.time() - - self._serial = SomeGraph._handlerSerial - SomeGraph._handlerSerial += 1 - - def __repr__(self): - return '<Handler #%s>' % self._serial - - def stats(self): - print self.__dict__ - return { - 'created': self.created, - 'ageHours': (time.time() - self.created) / 3600, - 'streamId': self.streamId, - 'remoteIp': self.request.remote_ip, - 'userAgent': self.request.headers.get('user-agent'), - } - - def bind(self): - self.graphClients.addSseHandler(self) - - def unbind(self): - self.graphClients.removeSseHandler(self) - -class Stats(cyclone.web.RequestHandler): - def get(self): - try: - stats = self.settings.graphClients.stats() - except: - import traceback; traceback.print_exc() - raise - - self.write(json.dumps({'graphClients': stats}, indent=2)) - -class Root(cyclone.web.RequestHandler): - def get(self): - self.write('<html><body>sse_collector</body></html>') - -if __name__ == '__main__': - - arg = docopt(""" - Usage: sse_collector.py [options] - - -v Verbose - """) - - if arg['-v']: - import twisted.python.log - twisted.python.log.startLogging(sys.stdout) - log.setLevel(logging.DEBUG) - - - graphClients = GraphClients() - - reactor.listenTCP( - 9072, - cyclone.web.Application( - handlers=[ - (r'/', Root), - (r'/stats', Stats), - (r'/graph/(.*)', SomeGraph), - ], - graphClients=graphClients), - interface='::') - reactor.run()
--- a/service/reasoning/twisted_sse_demo/README.md Mon Apr 16 22:18:49 2018 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,16 +0,0 @@ -Twisted SSE demo -================ - -A twisted web server that implements server sent events (SSE) - -To run this demo: - - python sse_twisted_web.py - -Open up http://localhost:12000 in your browser. - -To publish events: - - curl -d 'data=Hello!' http://localhost:12000/publish - -You should see the data you publish in your browser. That's it!
--- a/service/reasoning/twisted_sse_demo/__init__.py Mon Apr 16 22:18:49 2018 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1 +0,0 @@ -# from https://github.com/juggernaut/twisted-sse-demo
--- a/service/reasoning/twisted_sse_demo/api_example.py Mon Apr 16 22:18:49 2018 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,13 +0,0 @@ -import time - -from eventsource import EventSource - -EVENTSOURCE_URL = 'http://localhost:12000/subscribe' - -def onmessage(data): - print 'Got payload with data %s' % data - -if __name__ == '__main__': - eventSource = EventSource(EVENTSOURCE_URL) - eventSource.onmessage(onmessage, callInThread=True) - time.sleep(20)
--- a/service/reasoning/twisted_sse_demo/eventsource.py Mon Apr 16 22:18:49 2018 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,76 +0,0 @@ -from crochet import setup, run_in_reactor -from twisted.internet import reactor -from twisted.internet.defer import Deferred -from twisted.web.client import Agent -from twisted.web.http_headers import Headers - -from sse_client import EventSourceProtocol - -setup() - - -class EventSource(object): - """ - The main EventSource class - """ - def __init__(self, url): - self.url = url - self.protocol = EventSourceProtocol() - self.errorHandler = None - self.stashedError = None - self.connect() - - @run_in_reactor - def connect(self): - """ - Connect to the event source URL - """ - agent = Agent(reactor) - d = agent.request( - 'GET', - self.url, - Headers({ - 'User-Agent': ['Twisted SSE Client'], - 'Cache-Control': ['no-cache'], - 'Accept': ['text/event-stream; charset=utf-8'], - }), - None) - d.addErrback(self.connectError) - d.addCallback(self.cbRequest) - - def cbRequest(self, response): - if response.code != 200: - self.callErrorHandler("non 200 response received: %d" % - response.code) - else: - finished = Deferred() - self.protocol.setFinishedDeferred(finished) - response.deliverBody(self.protocol) - return finished - - def connectError(self, ignored): - self.callErrorHandler("error connecting to endpoint: %s" % self.url) - - def callErrorHandler(self, msg): - if self.errorHandler: - func, callInThread = self.errorHandler - if callInThread: - reactor.callInThread(func, msg) - else: - func(msg) - else: - self.stashedError = msg - - def onerror(self, func, callInThread=False): - self.errorHandler = func, callInThread - if self.stashedError: - self.callErrorHandler(self.stashedError) - - def onmessage(self, func, callInThread=False): - self.addEventListener('message', func, callInThread) - - def addEventListener(self, event, func, callInThread=False): - callback = func - if callInThread: - callback = lambda data: reactor.callInThread(func, data) - self.protocol.addCallback(event, callback)
--- a/service/reasoning/twisted_sse_demo/requirements.txt Mon Apr 16 22:18:49 2018 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1 +0,0 @@ -crochet
--- a/service/reasoning/twisted_sse_demo/sse_client.py Mon Apr 16 22:18:49 2018 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,67 +0,0 @@ -from twisted.protocols.basic import LineReceiver - - -class EventSourceProtocol(LineReceiver): - def __init__(self): - self.MAX_LENGTH = 1 << 20 - self.callbacks = {} - self.finished = None - # Initialize the event and data buffers - self.event = 'message' - self.data = '' - - def lineLengthExceeded(self, line): - print "line too long" - raise NotImplementedError - - def setFinishedDeferred(self, d): - self.finished = d - - def addCallback(self, event, func): - self.callbacks[event] = func - - def lineReceived(self, line): - if line == '': - # Dispatch event - self.dispatchEvent() - else: - try: - field, value = line.split(':', 1) - # If value starts with a space, strip it. - value = lstrip(value) - except ValueError: - # We got a line with no colon, treat it as a field(ignore) - return - - if field == '': - # This is a comment; ignore - pass - elif field == 'data': - self.data += value + '\n' - elif field == 'event': - self.event = value - elif field == 'id': - # Not implemented - pass - elif field == 'retry': - # Not implemented - pass - - def connectionLost(self, reason): - if self.finished: - self.finished.callback(None) - - def dispatchEvent(self): - """ - Dispatch the event - """ - # If last character is LF, strip it. - if self.data.endswith('\n'): - self.data = self.data[:-1] - if self.event in self.callbacks: - self.callbacks[self.event](self.data) - self.data = '' - self.event = 'message' - -def lstrip(value): - return value[1:] if value.startswith(' ') else value
--- a/service/reasoning/twisted_sse_demo/sse_server.py Mon Apr 16 22:18:49 2018 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,95 +0,0 @@ -import sys - -from twisted.web import server, resource -from twisted.internet import reactor -from twisted.python import log - - -class Root(resource.Resource): - """ - Root resource; serves JavaScript - """ - def getChild(self, name, request): - if name == '': - return self - return resource.Resource.getChild(self, name, request) - - def render_GET(self, request): - return r""" - <html> - <head> - <script language="JavaScript"> - eventSource = new EventSource("http://localhost:12000/subscribe"); - eventSource.onmessage = function(event) { - element = document.getElementById("event-data"); - element.innerHTML = event.data; - }; - </script> - </head> - <body> - <h1> Welcome to the SSE demo </h1> - <h3> Event data: </h3> - <p id="event-data"></p> - </body> - </html> - """ - - -class Subscribe(resource.Resource): - """ - Implements the subscribe resource - """ - isLeaf = True - - def __init__(self): - self.subscribers = set() - - def render_GET(self, request): - request.setHeader('Content-Type', 'text/event-stream; charset=utf-8') - request.setResponseCode(200) - self.subscribers.add(request) - d = request.notifyFinish() - d.addBoth(self.removeSubscriber) - log.msg("Adding subscriber...") - request.write("") - return server.NOT_DONE_YET - - def publishToAll(self, data): - for subscriber in self.subscribers: - for line in data: - subscriber.write("data: %s\r\n" % line) - # NOTE: the last CRLF is required to dispatch the event at the client - subscriber.write("\r\n") - - def removeSubscriber(self, subscriber): - if subscriber in self.subscribers: - log.msg("Removing subscriber..") - self.subscribers.remove(subscriber) - - -class Publish(resource.Resource): - """ - Implements the publish resource - """ - isLeaf = True - - def __init__(self, subscriber): - self.subscriber = subscriber - - def render_POST(self, request): - if 'data' not in request.args: - request.setResponseCode(400) - return "The parameter 'data' must be set\n" - data = request.args.get('data') - self.subscriber.publishToAll(data) - return 'Thank you for publishing data %s\n' % '\n'.join(data) - - -root = Root() -subscribe = Subscribe() -root.putChild('subscribe', subscribe) -root.putChild('publish', Publish(subscribe)) -site = server.Site(root) -reactor.listenTCP(12000, site) -log.startLogging(sys.stdout) -reactor.run()