Mercurial > code > home > repos > homeauto
changeset 510:b1337ad3ec2d
mv twisted_sse
Ignore-this: a59fadbe80bc4a393b1dab8228e022c1
author | drewp@bigasterisk.com |
---|---|
date | Mon, 22 Apr 2019 21:58:09 -0700 |
parents | c9cadfcb4fdc |
children | 576ee2e2b2a2 |
files | lib/twisted_sse/.gitignore lib/twisted_sse/MANIFEST.in lib/twisted_sse/README.md lib/twisted_sse/__init__.py lib/twisted_sse/api_example.py lib/twisted_sse/eventsource.py lib/twisted_sse/setup.py lib/twisted_sse/sse_client.py lib/twisted_sse/sse_server.py lib/twisted_sse_demo/README.md lib/twisted_sse_demo/__init__.py lib/twisted_sse_demo/api_example.py lib/twisted_sse_demo/eventsource.py lib/twisted_sse_demo/requirements.txt lib/twisted_sse_demo/sse_client.py lib/twisted_sse_demo/sse_server.py |
diffstat | 15 files changed, 282 insertions(+), 272 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/twisted_sse/.gitignore Mon Apr 22 21:58:09 2019 -0700 @@ -0,0 +1,1 @@ +*.pyc
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/twisted_sse/MANIFEST.in Mon Apr 22 21:58:09 2019 -0700 @@ -0,0 +1,2 @@ +include *.py +include README.md \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/twisted_sse/README.md Mon Apr 22 21:58:09 2019 -0700 @@ -0,0 +1,13 @@ + +This is https://github.com/juggernaut/twisted-sse-demo with some +changes, including: + +- rename from twisted-sse-demo to twisted_sse so it can be a package +- add setup.py so it can be depended on +- some python3 support +- remove crochet + +api_example.py is probably out of date. + +drewp is using cyclone.sse for server support and this +twisted_sse.eventsource as the client.
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/twisted_sse/api_example.py Mon Apr 22 21:58:09 2019 -0700 @@ -0,0 +1,13 @@ +import time + +from eventsource import EventSource + +EVENTSOURCE_URL = 'http://localhost:12000/subscribe' + +def onmessage(data): + print('Got payload with data %s' % data) + +if __name__ == '__main__': + eventSource = EventSource(EVENTSOURCE_URL) + eventSource.onmessage(onmessage, callInThread=True) + time.sleep(20)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/twisted_sse/eventsource.py Mon Apr 22 21:58:09 2019 -0700 @@ -0,0 +1,79 @@ +from twisted.internet import reactor +from twisted.internet.defer import Deferred +from twisted.web.client import Agent +from twisted.web.http_headers import Headers + +from .sse_client import EventSourceProtocol + + +class EventSource(object): + """ + The main EventSource class + """ + def __init__(self, url, userAgent): + # type: (str, bytes) + self.url = url + self.userAgent = userAgent + self.protocol = EventSourceProtocol(self.onConnectionLost) + self.errorHandler = None + self.stashedError = None + self.connect() + + def connect(self): + """ + Connect to the event source URL + """ + agent = Agent(reactor, connectTimeout=5) + self.agent = agent + d = agent.request( + b'GET', + self.url, + Headers({ + b'User-Agent': [self.userAgent], + b'Cache-Control': [b'no-cache'], + b'Accept': [b'text/event-stream; charset=utf-8'], + }), + None) + d.addCallbacks(self.cbRequest, self.connectError) + + def cbRequest(self, response): + if response is None: + # seems out of spec, according to https://twistedmatrix.com/documents/current/api/twisted.web.iweb.IAgent.html + raise ValueError('no response for url %r' % self.url) + elif response.code != 200: + self.callErrorHandler("non 200 response received: %d" % + response.code) + else: + response.deliverBody(self.protocol) + + def connectError(self, ignored): + self.callErrorHandler("error connecting to endpoint: %s" % self.url) + + def onConnectionLost(self, reason): + # overridden + reason.printDetailedTraceback() + + def callErrorHandler(self, msg): + if self.errorHandler: + func, callInThread = self.errorHandler + if callInThread: + reactor.callInThread(func, msg) + else: + func(msg) + else: + self.stashedError = msg + + def onerror(self, func, callInThread=False): + self.errorHandler = func, callInThread + if self.stashedError: + self.callErrorHandler(self.stashedError) + + def onmessage(self, func, callInThread=False): + self.addEventListener('message', func, callInThread) + + def addEventListener(self, event, func, callInThread=False): + assert isinstance(event, bytes), event + callback = func + if callInThread: + callback = lambda data: reactor.callInThread(func, data) + self.protocol.addCallback(event, callback)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/twisted_sse/setup.py Mon Apr 22 21:58:09 2019 -0700 @@ -0,0 +1,12 @@ +from distutils.core import setup + +setup( + name='twisted_sse', + version='0.2.0', + packages=['twisted_sse'], + package_dir={'twisted_sse': ''}, + install_requires=['twisted'], + url='https://projects.bigasterisk.com/', + author='Drew Perttula', + author_email='drewp@bigasterisk.com', + )
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/twisted_sse/sse_client.py Mon Apr 22 21:58:09 2019 -0700 @@ -0,0 +1,67 @@ +from twisted.protocols.basic import LineReceiver + + +class EventSourceProtocol(LineReceiver): + def __init__(self, onConnectionLost): + self.onConnectionLost = onConnectionLost + self.delimiter = b'\n' + self.MAX_LENGTH = 1 << 20 + self.callbacks = {} + self.finished = None + # Initialize the event and data buffers + self.event = b'message' + self.data = b'' + + def lineLengthExceeded(self, line): + raise NotImplementedError('line too long') + + def setFinishedDeferred(self, d): + self.finished = d + + def addCallback(self, event, func): + self.callbacks[event] = func + + def lineReceived(self, line): + if line == b'': + # Dispatch event + self.dispatchEvent() + else: + try: + field, value = line.split(b':', 1) + # If value starts with a space, strip it. + value = lstrip(value) + except ValueError: + # We got a line with no colon, treat it as a field(ignore) + return + + if field == b'': + # This is a comment; ignore + pass + elif field == b'data': + self.data += value + b'\n' + elif field == b'event': + self.event = value + elif field == b'id': + # Not implemented + pass + elif field == b'retry': + # Not implemented + pass + + def connectionLost(self, reason): + self.onConnectionLost(reason) + + def dispatchEvent(self): + """ + Dispatch the event + """ + # If last character is LF, strip it. + if self.data.endswith(b'\n'): + self.data = self.data[:-1] + if self.event in self.callbacks: + self.callbacks[self.event](self.data) + self.data = b'' + self.event = b'message' + +def lstrip(value): + return value[1:] if value.startswith(b' ') else value
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/twisted_sse/sse_server.py Mon Apr 22 21:58:09 2019 -0700 @@ -0,0 +1,95 @@ +import sys + +from twisted.web import server, resource +from twisted.internet import reactor +from twisted.python import log + + +class Root(resource.Resource): + """ + Root resource; serves JavaScript + """ + def getChild(self, name, request): + if name == '': + return self + return resource.Resource.getChild(self, name, request) + + def render_GET(self, request): + return r""" + <html> + <head> + <script language="JavaScript"> + eventSource = new EventSource("http://localhost:12000/subscribe"); + eventSource.onmessage = function(event) { + element = document.getElementById("event-data"); + element.innerHTML = event.data; + }; + </script> + </head> + <body> + <h1> Welcome to the SSE demo </h1> + <h3> Event data: </h3> + <p id="event-data"></p> + </body> + </html> + """ + + +class Subscribe(resource.Resource): + """ + Implements the subscribe resource + """ + isLeaf = True + + def __init__(self): + self.subscribers = set() + + def render_GET(self, request): + request.setHeader('Content-Type', 'text/event-stream; charset=utf-8') + request.setResponseCode(200) + self.subscribers.add(request) + d = request.notifyFinish() + d.addBoth(self.removeSubscriber) + log.msg("Adding subscriber...") + request.write("") + return server.NOT_DONE_YET + + def publishToAll(self, data): + for subscriber in self.subscribers: + for line in data: + subscriber.write("data: %s\r\n" % line) + # NOTE: the last CRLF is required to dispatch the event at the client + subscriber.write("\r\n") + + def removeSubscriber(self, subscriber): + if subscriber in self.subscribers: + log.msg("Removing subscriber..") + self.subscribers.remove(subscriber) + + +class Publish(resource.Resource): + """ + Implements the publish resource + """ + isLeaf = True + + def __init__(self, subscriber): + self.subscriber = subscriber + + def render_POST(self, request): + if 'data' not in request.args: + request.setResponseCode(400) + return "The parameter 'data' must be set\n" + data = request.args.get('data') + self.subscriber.publishToAll(data) + return 'Thank you for publishing data %s\n' % '\n'.join(data) + + +root = Root() +subscribe = Subscribe() +root.putChild('subscribe', subscribe) +root.putChild('publish', Publish(subscribe)) +site = server.Site(root) +reactor.listenTCP(12000, site) +log.startLogging(sys.stdout) +reactor.run()
--- a/lib/twisted_sse_demo/README.md Sun Apr 21 23:26:45 2019 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,16 +0,0 @@ -Twisted SSE demo -================ - -A twisted web server that implements server sent events (SSE) - -To run this demo: - - python sse_twisted_web.py - -Open up http://localhost:12000 in your browser. - -To publish events: - - curl -d 'data=Hello!' http://localhost:12000/publish - -You should see the data you publish in your browser. That's it!
--- a/lib/twisted_sse_demo/__init__.py Sun Apr 21 23:26:45 2019 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1 +0,0 @@ -# from https://github.com/juggernaut/twisted-sse-demo
--- a/lib/twisted_sse_demo/api_example.py Sun Apr 21 23:26:45 2019 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,13 +0,0 @@ -import time - -from eventsource import EventSource - -EVENTSOURCE_URL = 'http://localhost:12000/subscribe' - -def onmessage(data): - print 'Got payload with data %s' % data - -if __name__ == '__main__': - eventSource = EventSource(EVENTSOURCE_URL) - eventSource.onmessage(onmessage, callInThread=True) - time.sleep(20)
--- a/lib/twisted_sse_demo/eventsource.py Sun Apr 21 23:26:45 2019 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,79 +0,0 @@ -from twisted.internet import reactor -from twisted.internet.defer import Deferred -from twisted.web.client import Agent -from twisted.web.http_headers import Headers - -from .sse_client import EventSourceProtocol - - -class EventSource(object): - """ - The main EventSource class - """ - def __init__(self, url, userAgent): - # type: (str, bytes) - self.url = url - self.userAgent = userAgent - self.protocol = EventSourceProtocol(self.onConnectionLost) - self.errorHandler = None - self.stashedError = None - self.connect() - - def connect(self): - """ - Connect to the event source URL - """ - agent = Agent(reactor, connectTimeout=5) - self.agent = agent - d = agent.request( - b'GET', - self.url, - Headers({ - b'User-Agent': [self.userAgent], - b'Cache-Control': [b'no-cache'], - b'Accept': [b'text/event-stream; charset=utf-8'], - }), - None) - d.addCallbacks(self.cbRequest, self.connectError) - - def cbRequest(self, response): - if response is None: - # seems out of spec, according to https://twistedmatrix.com/documents/current/api/twisted.web.iweb.IAgent.html - raise ValueError('no response for url %r' % self.url) - elif response.code != 200: - self.callErrorHandler("non 200 response received: %d" % - response.code) - else: - response.deliverBody(self.protocol) - - def connectError(self, ignored): - self.callErrorHandler("error connecting to endpoint: %s" % self.url) - - def onConnectionLost(self, reason): - # overridden - reason.printDetailedTraceback() - - def callErrorHandler(self, msg): - if self.errorHandler: - func, callInThread = self.errorHandler - if callInThread: - reactor.callInThread(func, msg) - else: - func(msg) - else: - self.stashedError = msg - - def onerror(self, func, callInThread=False): - self.errorHandler = func, callInThread - if self.stashedError: - self.callErrorHandler(self.stashedError) - - def onmessage(self, func, callInThread=False): - self.addEventListener('message', func, callInThread) - - def addEventListener(self, event, func, callInThread=False): - assert isinstance(event, bytes), event - callback = func - if callInThread: - callback = lambda data: reactor.callInThread(func, data) - self.protocol.addCallback(event, callback)
--- a/lib/twisted_sse_demo/requirements.txt Sun Apr 21 23:26:45 2019 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1 +0,0 @@ -crochet
--- a/lib/twisted_sse_demo/sse_client.py Sun Apr 21 23:26:45 2019 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,67 +0,0 @@ -from twisted.protocols.basic import LineReceiver - - -class EventSourceProtocol(LineReceiver): - def __init__(self, onConnectionLost): - self.onConnectionLost = onConnectionLost - self.delimiter = b'\n' - self.MAX_LENGTH = 1 << 20 - self.callbacks = {} - self.finished = None - # Initialize the event and data buffers - self.event = b'message' - self.data = b'' - - def lineLengthExceeded(self, line): - raise NotImplementedError('line too long') - - def setFinishedDeferred(self, d): - self.finished = d - - def addCallback(self, event, func): - self.callbacks[event] = func - - def lineReceived(self, line): - if line == b'': - # Dispatch event - self.dispatchEvent() - else: - try: - field, value = line.split(b':', 1) - # If value starts with a space, strip it. - value = lstrip(value) - except ValueError: - # We got a line with no colon, treat it as a field(ignore) - return - - if field == b'': - # This is a comment; ignore - pass - elif field == b'data': - self.data += value + b'\n' - elif field == b'event': - self.event = value - elif field == b'id': - # Not implemented - pass - elif field == b'retry': - # Not implemented - pass - - def connectionLost(self, reason): - self.onConnectionLost(reason) - - def dispatchEvent(self): - """ - Dispatch the event - """ - # If last character is LF, strip it. - if self.data.endswith(b'\n'): - self.data = self.data[:-1] - if self.event in self.callbacks: - self.callbacks[self.event](self.data) - self.data = b'' - self.event = b'message' - -def lstrip(value): - return value[1:] if value.startswith(b' ') else value
--- a/lib/twisted_sse_demo/sse_server.py Sun Apr 21 23:26:45 2019 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,95 +0,0 @@ -import sys - -from twisted.web import server, resource -from twisted.internet import reactor -from twisted.python import log - - -class Root(resource.Resource): - """ - Root resource; serves JavaScript - """ - def getChild(self, name, request): - if name == '': - return self - return resource.Resource.getChild(self, name, request) - - def render_GET(self, request): - return r""" - <html> - <head> - <script language="JavaScript"> - eventSource = new EventSource("http://localhost:12000/subscribe"); - eventSource.onmessage = function(event) { - element = document.getElementById("event-data"); - element.innerHTML = event.data; - }; - </script> - </head> - <body> - <h1> Welcome to the SSE demo </h1> - <h3> Event data: </h3> - <p id="event-data"></p> - </body> - </html> - """ - - -class Subscribe(resource.Resource): - """ - Implements the subscribe resource - """ - isLeaf = True - - def __init__(self): - self.subscribers = set() - - def render_GET(self, request): - request.setHeader('Content-Type', 'text/event-stream; charset=utf-8') - request.setResponseCode(200) - self.subscribers.add(request) - d = request.notifyFinish() - d.addBoth(self.removeSubscriber) - log.msg("Adding subscriber...") - request.write("") - return server.NOT_DONE_YET - - def publishToAll(self, data): - for subscriber in self.subscribers: - for line in data: - subscriber.write("data: %s\r\n" % line) - # NOTE: the last CRLF is required to dispatch the event at the client - subscriber.write("\r\n") - - def removeSubscriber(self, subscriber): - if subscriber in self.subscribers: - log.msg("Removing subscriber..") - self.subscribers.remove(subscriber) - - -class Publish(resource.Resource): - """ - Implements the publish resource - """ - isLeaf = True - - def __init__(self, subscriber): - self.subscriber = subscriber - - def render_POST(self, request): - if 'data' not in request.args: - request.setResponseCode(400) - return "The parameter 'data' must be set\n" - data = request.args.get('data') - self.subscriber.publishToAll(data) - return 'Thank you for publishing data %s\n' % '\n'.join(data) - - -root = Root() -subscribe = Subscribe() -root.putChild('subscribe', subscribe) -root.putChild('publish', Publish(subscribe)) -site = server.Site(root) -reactor.listenTCP(12000, site) -log.startLogging(sys.stdout) -reactor.run()