diff lib/twisted_sse/eventsource.py @ 1313:630e0d1846b8

mv twisted_sse Ignore-this: a59fadbe80bc4a393b1dab8228e022c1 darcs-hash:dc218d166be8566000cc59eabaa71b2f9667fe9b
author drewp <drewp@bigasterisk.com>
date Mon, 22 Apr 2019 21:58:09 -0700
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/twisted_sse/eventsource.py	Mon Apr 22 21:58:09 2019 -0700
@@ -0,0 +1,79 @@
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred
+from twisted.web.client import Agent
+from twisted.web.http_headers import Headers
+
+from .sse_client import EventSourceProtocol
+
+
+class EventSource(object):
+    """
+    The main EventSource class
+    """
+    def __init__(self, url, userAgent):
+        # type: (str, bytes)
+        self.url = url
+        self.userAgent = userAgent
+        self.protocol = EventSourceProtocol(self.onConnectionLost)
+        self.errorHandler = None
+        self.stashedError = None
+        self.connect()
+
+    def connect(self):
+        """
+        Connect to the event source URL
+        """
+        agent = Agent(reactor, connectTimeout=5)
+        self.agent = agent
+        d = agent.request(
+            b'GET',
+            self.url,
+            Headers({
+                b'User-Agent': [self.userAgent],
+                b'Cache-Control': [b'no-cache'],
+                b'Accept': [b'text/event-stream; charset=utf-8'],
+            }),
+            None)
+        d.addCallbacks(self.cbRequest, self.connectError)
+
+    def cbRequest(self, response):
+        if response is None:
+            # seems out of spec, according to https://twistedmatrix.com/documents/current/api/twisted.web.iweb.IAgent.html
+            raise ValueError('no response for url %r' % self.url)
+        elif response.code != 200:
+            self.callErrorHandler("non 200 response received: %d" %
+                                  response.code)
+        else:
+            response.deliverBody(self.protocol)
+
+    def connectError(self, ignored):
+        self.callErrorHandler("error connecting to endpoint: %s" % self.url)
+
+    def onConnectionLost(self, reason):
+        # overridden
+        reason.printDetailedTraceback()
+        
+    def callErrorHandler(self, msg):
+        if self.errorHandler:
+            func, callInThread = self.errorHandler
+            if callInThread:
+                reactor.callInThread(func, msg)
+            else:
+                func(msg)
+        else:
+            self.stashedError = msg
+
+    def onerror(self, func, callInThread=False):
+        self.errorHandler = func, callInThread
+        if self.stashedError:
+            self.callErrorHandler(self.stashedError)
+
+    def onmessage(self, func, callInThread=False):
+        self.addEventListener('message', func, callInThread)
+
+    def addEventListener(self, event, func, callInThread=False):
+        assert isinstance(event, bytes), event
+        callback = func
+        if callInThread:
+            callback = lambda data: reactor.callInThread(func, data)
+        self.protocol.addCallback(event, callback)