Mercurial > code > home > repos > homeauto
diff lib/twisted_sse/eventsource.py @ 1313:630e0d1846b8
mv twisted_sse
Ignore-this: a59fadbe80bc4a393b1dab8228e022c1
darcs-hash:dc218d166be8566000cc59eabaa71b2f9667fe9b
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Mon, 22 Apr 2019 21:58:09 -0700 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/twisted_sse/eventsource.py Mon Apr 22 21:58:09 2019 -0700 @@ -0,0 +1,79 @@ +from twisted.internet import reactor +from twisted.internet.defer import Deferred +from twisted.web.client import Agent +from twisted.web.http_headers import Headers + +from .sse_client import EventSourceProtocol + + +class EventSource(object): + """ + The main EventSource class + """ + def __init__(self, url, userAgent): + # type: (str, bytes) + self.url = url + self.userAgent = userAgent + self.protocol = EventSourceProtocol(self.onConnectionLost) + self.errorHandler = None + self.stashedError = None + self.connect() + + def connect(self): + """ + Connect to the event source URL + """ + agent = Agent(reactor, connectTimeout=5) + self.agent = agent + d = agent.request( + b'GET', + self.url, + Headers({ + b'User-Agent': [self.userAgent], + b'Cache-Control': [b'no-cache'], + b'Accept': [b'text/event-stream; charset=utf-8'], + }), + None) + d.addCallbacks(self.cbRequest, self.connectError) + + def cbRequest(self, response): + if response is None: + # seems out of spec, according to https://twistedmatrix.com/documents/current/api/twisted.web.iweb.IAgent.html + raise ValueError('no response for url %r' % self.url) + elif response.code != 200: + self.callErrorHandler("non 200 response received: %d" % + response.code) + else: + response.deliverBody(self.protocol) + + def connectError(self, ignored): + self.callErrorHandler("error connecting to endpoint: %s" % self.url) + + def onConnectionLost(self, reason): + # overridden + reason.printDetailedTraceback() + + def callErrorHandler(self, msg): + if self.errorHandler: + func, callInThread = self.errorHandler + if callInThread: + reactor.callInThread(func, msg) + else: + func(msg) + else: + self.stashedError = msg + + def onerror(self, func, callInThread=False): + self.errorHandler = func, callInThread + if self.stashedError: + self.callErrorHandler(self.stashedError) + + def onmessage(self, func, callInThread=False): + self.addEventListener('message', func, callInThread) + + def addEventListener(self, event, func, callInThread=False): + assert isinstance(event, bytes), event + callback = func + if callInThread: + callback = lambda data: reactor.callInThread(func, data) + self.protocol.addCallback(event, callback)