changeset 294:14ac4a210dbc

Copy from from https://github.com/juggernaut/twisted-sse-demo Ignore-this: a252d3c6e023cb83fd10bfbae78e05d9
author drewp@bigasterisk.com
date Fri, 19 Aug 2016 10:53:03 -0700
parents fc0e42933baa
children d25ac47f4820
files service/reasoning/twisted_sse_demo/README.md service/reasoning/twisted_sse_demo/api_example.py service/reasoning/twisted_sse_demo/eventsource.py service/reasoning/twisted_sse_demo/requirements.txt service/reasoning/twisted_sse_demo/sse_client.py service/reasoning/twisted_sse_demo/sse_server.py
diffstat 6 files changed, 264 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/reasoning/twisted_sse_demo/README.md	Fri Aug 19 10:53:03 2016 -0700
@@ -0,0 +1,16 @@
+Twisted SSE demo
+================
+
+A twisted web server that implements server sent events (SSE)
+
+To run this demo:
+
+    python sse_twisted_web.py
+    
+Open up http://localhost:12000 in your browser.
+
+To publish events:
+
+    curl -d 'data=Hello!' http://localhost:12000/publish
+    
+You should see the data you publish in your browser. That's it!
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/reasoning/twisted_sse_demo/api_example.py	Fri Aug 19 10:53:03 2016 -0700
@@ -0,0 +1,13 @@
+import time
+
+from eventsource import EventSource
+
+EVENTSOURCE_URL = 'http://localhost:12000/subscribe'
+
+def onmessage(data):
+    print 'Got payload with data %s' % data
+
+if __name__ == '__main__':
+    eventSource = EventSource(EVENTSOURCE_URL)
+    eventSource.onmessage(onmessage, callInThread=True)
+    time.sleep(20)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/reasoning/twisted_sse_demo/eventsource.py	Fri Aug 19 10:53:03 2016 -0700
@@ -0,0 +1,77 @@
+from crochet import setup, run_in_reactor
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred
+from twisted.web.client import Agent
+from twisted.web.http_headers import Headers
+
+from sse_client import EventSourceProtocol
+
+#setup()
+
+
+class EventSource(object):
+    """
+    The main EventSource class
+    """
+    def __init__(self, url):
+        self.url = url
+        self.protocol = EventSourceProtocol()
+        self.errorHandler = None
+        self.stashedError = None
+        self.connect()
+
+    #@run_in_reactor
+    def connect(self):
+        """
+        Connect to the event source URL
+        """
+        agent = Agent(reactor)
+        d = agent.request(
+            'GET',
+            self.url,
+            Headers({
+                'User-Agent': ['Twisted SSE Client'],
+                'Cache-Control': ['no-cache'],
+                'Accept': ['text/event-stream; charset=utf-8'],
+            }),
+            None)
+        d.addErrback(self.connectError)
+        d.addCallback(self.cbRequest)
+
+    def cbRequest(self, response):
+        print 'cbRequest', response.code
+        if response.code != 200:
+            self.callErrorHandler("non 200 response received: %d" %
+                                  response.code)
+        else:
+            finished = Deferred()
+            self.protocol.setFinishedDeferred(finished)
+            response.deliverBody(self.protocol)
+            return finished
+
+    def connectError(self, ignored):
+        self.callErrorHandler("error connecting to endpoint: %s" % self.url)
+
+    def callErrorHandler(self, msg):
+        if self.errorHandler:
+            func, callInThread = self.errorHandler
+            if callInThread:
+                reactor.callInThread(func, msg)
+            else:
+                func(msg)
+        else:
+            self.stashedError = msg
+
+    def onerror(self, func, callInThread=False):
+        self.errorHandler = func, callInThread
+        if self.stashedError:
+            self.callErrorHandler(self.stashedError)
+
+    def onmessage(self, func, callInThread=False):
+        self.addEventListener('message', func, callInThread)
+
+    def addEventListener(self, event, func, callInThread=False):
+        callback = func
+        if callInThread:
+            callback = lambda data: reactor.callInThread(func, data)
+        self.protocol.addCallback(event, callback)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/reasoning/twisted_sse_demo/requirements.txt	Fri Aug 19 10:53:03 2016 -0700
@@ -0,0 +1,1 @@
+crochet
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/reasoning/twisted_sse_demo/sse_client.py	Fri Aug 19 10:53:03 2016 -0700
@@ -0,0 +1,62 @@
+from twisted.protocols.basic import LineReceiver
+
+
+class EventSourceProtocol(LineReceiver):
+    def __init__(self):
+        self.callbacks = {}
+        self.finished = None
+        # Initialize the event and data buffers
+        self.event = 'message'
+        self.data = ''
+
+    def setFinishedDeferred(self, d):
+        self.finished = d
+
+    def addCallback(self, event, func):
+        self.callbacks[event] = func
+        
+    def lineReceived(self, line):
+        if line == '':
+            # Dispatch event
+            self.dispatchEvent()
+        else:
+            try:
+                field, value = line.split(':', 1)
+                # If value starts with a space, strip it.
+                value = lstrip(value)
+            except ValueError:
+                # We got a line with no colon, treat it as a field(ignore)
+                return
+
+            if field == '':
+                # This is a comment; ignore
+                pass
+            elif field == 'data':
+                self.data += value + '\n'
+            elif field == 'event':
+                self.event = value
+            elif field == 'id':
+                # Not implemented
+                pass
+            elif field == 'retry':
+                # Not implemented
+                pass
+
+    def connectionLost(self, reason):
+        if self.finished:
+            self.finished.callback(None)
+
+    def dispatchEvent(self):
+        """
+        Dispatch the event
+        """
+        # If last character is LF, strip it.
+        if self.data.endswith('\n'):
+            self.data = self.data[:-1]
+        if self.event in self.callbacks:
+            self.callbacks[self.event](self.data)
+        self.data = ''
+        self.event = 'message'
+
+def lstrip(value):
+    return value[1:] if value.startswith(' ') else value
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/reasoning/twisted_sse_demo/sse_server.py	Fri Aug 19 10:53:03 2016 -0700
@@ -0,0 +1,95 @@
+import sys
+
+from twisted.web import server, resource
+from twisted.internet import reactor
+from twisted.python import log
+
+
+class Root(resource.Resource):
+    """
+    Root resource; serves JavaScript
+    """
+    def getChild(self, name, request):
+        if name == '':
+            return self
+        return resource.Resource.getChild(self, name, request)
+
+    def render_GET(self, request):
+        return r"""
+        <html>
+            <head>
+                <script language="JavaScript">
+                        eventSource = new EventSource("http://localhost:12000/subscribe");
+                        eventSource.onmessage = function(event) {
+                            element = document.getElementById("event-data");
+                            element.innerHTML = event.data;
+                        };
+                    </script>
+            </head>
+            <body>
+                <h1> Welcome to the SSE demo </h1>
+                <h3> Event data: </h3>
+                <p id="event-data"></p>
+            </body>
+        </html>
+        """
+
+
+class Subscribe(resource.Resource):
+    """
+    Implements the subscribe resource
+    """
+    isLeaf = True
+
+    def __init__(self):
+        self.subscribers = set()
+
+    def render_GET(self, request):
+        request.setHeader('Content-Type', 'text/event-stream; charset=utf-8')
+        request.setResponseCode(200)
+        self.subscribers.add(request)
+        d = request.notifyFinish()
+        d.addBoth(self.removeSubscriber)
+        log.msg("Adding subscriber...")
+        request.write("")
+        return server.NOT_DONE_YET
+
+    def publishToAll(self, data):
+        for subscriber in self.subscribers:
+            for line in data:
+                subscriber.write("data: %s\r\n" % line)
+            # NOTE: the last CRLF is required to dispatch the event at the client
+            subscriber.write("\r\n")
+
+    def removeSubscriber(self, subscriber):
+        if subscriber in self.subscribers:
+            log.msg("Removing subscriber..")
+            self.subscribers.remove(subscriber)
+
+
+class Publish(resource.Resource):
+    """
+    Implements the publish resource
+    """
+    isLeaf = True
+
+    def __init__(self, subscriber):
+        self.subscriber = subscriber
+
+    def render_POST(self, request):
+        if 'data' not in request.args:
+            request.setResponseCode(400)
+            return "The parameter 'data' must be set\n"
+        data = request.args.get('data')
+        self.subscriber.publishToAll(data)
+        return 'Thank you for publishing data %s\n' % '\n'.join(data)
+
+
+root = Root()
+subscribe = Subscribe()
+root.putChild('subscribe', subscribe)
+root.putChild('publish', Publish(subscribe))
+site = server.Site(root)
+reactor.listenTCP(12000, site)
+log.startLogging(sys.stdout)
+reactor.run()