Mercurial > code > home > repos > homeauto
changeset 1099:cb94ea3495b2
Copy from from https://github.com/juggernaut/twisted-sse-demo
Ignore-this: a252d3c6e023cb83fd10bfbae78e05d9
darcs-hash:bd76fd3b996c7225f03930b3796dc4ddc7783d9a
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Fri, 19 Aug 2016 10:53:03 -0700 |
parents | b5906f6fce3f |
children | aef48ac655b1 |
files | service/reasoning/twisted_sse_demo/README.md service/reasoning/twisted_sse_demo/api_example.py service/reasoning/twisted_sse_demo/eventsource.py service/reasoning/twisted_sse_demo/requirements.txt service/reasoning/twisted_sse_demo/sse_client.py service/reasoning/twisted_sse_demo/sse_server.py |
diffstat | 6 files changed, 264 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/reasoning/twisted_sse_demo/README.md Fri Aug 19 10:53:03 2016 -0700 @@ -0,0 +1,16 @@ +Twisted SSE demo +================ + +A twisted web server that implements server sent events (SSE) + +To run this demo: + + python sse_twisted_web.py + +Open up http://localhost:12000 in your browser. + +To publish events: + + curl -d 'data=Hello!' http://localhost:12000/publish + +You should see the data you publish in your browser. That's it!
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/reasoning/twisted_sse_demo/api_example.py Fri Aug 19 10:53:03 2016 -0700 @@ -0,0 +1,13 @@ +import time + +from eventsource import EventSource + +EVENTSOURCE_URL = 'http://localhost:12000/subscribe' + +def onmessage(data): + print 'Got payload with data %s' % data + +if __name__ == '__main__': + eventSource = EventSource(EVENTSOURCE_URL) + eventSource.onmessage(onmessage, callInThread=True) + time.sleep(20)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/reasoning/twisted_sse_demo/eventsource.py Fri Aug 19 10:53:03 2016 -0700 @@ -0,0 +1,77 @@ +from crochet import setup, run_in_reactor +from twisted.internet import reactor +from twisted.internet.defer import Deferred +from twisted.web.client import Agent +from twisted.web.http_headers import Headers + +from sse_client import EventSourceProtocol + +#setup() + + +class EventSource(object): + """ + The main EventSource class + """ + def __init__(self, url): + self.url = url + self.protocol = EventSourceProtocol() + self.errorHandler = None + self.stashedError = None + self.connect() + + #@run_in_reactor + def connect(self): + """ + Connect to the event source URL + """ + agent = Agent(reactor) + d = agent.request( + 'GET', + self.url, + Headers({ + 'User-Agent': ['Twisted SSE Client'], + 'Cache-Control': ['no-cache'], + 'Accept': ['text/event-stream; charset=utf-8'], + }), + None) + d.addErrback(self.connectError) + d.addCallback(self.cbRequest) + + def cbRequest(self, response): + print 'cbRequest', response.code + if response.code != 200: + self.callErrorHandler("non 200 response received: %d" % + response.code) + else: + finished = Deferred() + self.protocol.setFinishedDeferred(finished) + response.deliverBody(self.protocol) + return finished + + def connectError(self, ignored): + self.callErrorHandler("error connecting to endpoint: %s" % self.url) + + def callErrorHandler(self, msg): + if self.errorHandler: + func, callInThread = self.errorHandler + if callInThread: + reactor.callInThread(func, msg) + else: + func(msg) + else: + self.stashedError = msg + + def onerror(self, func, callInThread=False): + self.errorHandler = func, callInThread + if self.stashedError: + self.callErrorHandler(self.stashedError) + + def onmessage(self, func, callInThread=False): + self.addEventListener('message', func, callInThread) + + def addEventListener(self, event, func, callInThread=False): + callback = func + if callInThread: + callback = lambda data: reactor.callInThread(func, data) + self.protocol.addCallback(event, callback)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/reasoning/twisted_sse_demo/requirements.txt Fri Aug 19 10:53:03 2016 -0700 @@ -0,0 +1,1 @@ +crochet
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/reasoning/twisted_sse_demo/sse_client.py Fri Aug 19 10:53:03 2016 -0700 @@ -0,0 +1,62 @@ +from twisted.protocols.basic import LineReceiver + + +class EventSourceProtocol(LineReceiver): + def __init__(self): + self.callbacks = {} + self.finished = None + # Initialize the event and data buffers + self.event = 'message' + self.data = '' + + def setFinishedDeferred(self, d): + self.finished = d + + def addCallback(self, event, func): + self.callbacks[event] = func + + def lineReceived(self, line): + if line == '': + # Dispatch event + self.dispatchEvent() + else: + try: + field, value = line.split(':', 1) + # If value starts with a space, strip it. + value = lstrip(value) + except ValueError: + # We got a line with no colon, treat it as a field(ignore) + return + + if field == '': + # This is a comment; ignore + pass + elif field == 'data': + self.data += value + '\n' + elif field == 'event': + self.event = value + elif field == 'id': + # Not implemented + pass + elif field == 'retry': + # Not implemented + pass + + def connectionLost(self, reason): + if self.finished: + self.finished.callback(None) + + def dispatchEvent(self): + """ + Dispatch the event + """ + # If last character is LF, strip it. + if self.data.endswith('\n'): + self.data = self.data[:-1] + if self.event in self.callbacks: + self.callbacks[self.event](self.data) + self.data = '' + self.event = 'message' + +def lstrip(value): + return value[1:] if value.startswith(' ') else value
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/reasoning/twisted_sse_demo/sse_server.py Fri Aug 19 10:53:03 2016 -0700 @@ -0,0 +1,95 @@ +import sys + +from twisted.web import server, resource +from twisted.internet import reactor +from twisted.python import log + + +class Root(resource.Resource): + """ + Root resource; serves JavaScript + """ + def getChild(self, name, request): + if name == '': + return self + return resource.Resource.getChild(self, name, request) + + def render_GET(self, request): + return r""" + <html> + <head> + <script language="JavaScript"> + eventSource = new EventSource("http://localhost:12000/subscribe"); + eventSource.onmessage = function(event) { + element = document.getElementById("event-data"); + element.innerHTML = event.data; + }; + </script> + </head> + <body> + <h1> Welcome to the SSE demo </h1> + <h3> Event data: </h3> + <p id="event-data"></p> + </body> + </html> + """ + + +class Subscribe(resource.Resource): + """ + Implements the subscribe resource + """ + isLeaf = True + + def __init__(self): + self.subscribers = set() + + def render_GET(self, request): + request.setHeader('Content-Type', 'text/event-stream; charset=utf-8') + request.setResponseCode(200) + self.subscribers.add(request) + d = request.notifyFinish() + d.addBoth(self.removeSubscriber) + log.msg("Adding subscriber...") + request.write("") + return server.NOT_DONE_YET + + def publishToAll(self, data): + for subscriber in self.subscribers: + for line in data: + subscriber.write("data: %s\r\n" % line) + # NOTE: the last CRLF is required to dispatch the event at the client + subscriber.write("\r\n") + + def removeSubscriber(self, subscriber): + if subscriber in self.subscribers: + log.msg("Removing subscriber..") + self.subscribers.remove(subscriber) + + +class Publish(resource.Resource): + """ + Implements the publish resource + """ + isLeaf = True + + def __init__(self, subscriber): + self.subscriber = subscriber + + def render_POST(self, request): + if 'data' not in request.args: + request.setResponseCode(400) + return "The parameter 'data' must be set\n" + data = request.args.get('data') + self.subscriber.publishToAll(data) + return 'Thank you for publishing data %s\n' % '\n'.join(data) + + +root = Root() +subscribe = Subscribe() +root.putChild('subscribe', subscribe) +root.putChild('publish', Publish(subscribe)) +site = server.Site(root) +reactor.listenTCP(12000, site) +log.startLogging(sys.stdout) +reactor.run()