comparison lib/twisted_sse_demo/eventsource.py @ 351:7716b1810d6c

reasoning & collector move into docker images Ignore-this: 67e97d307eba96791cbe77e57c57ad57
author drewp@bigasterisk.com
date Mon, 03 Sep 2018 00:45:34 -0700
parents service/reasoning/twisted_sse_demo/eventsource.py@29f593aee67b
children 59067d81a296
comparison
equal deleted inserted replaced
350:a380561fd8a8 351:7716b1810d6c
1 from crochet import setup, run_in_reactor
2 from twisted.internet import reactor
3 from twisted.internet.defer import Deferred
4 from twisted.web.client import Agent
5 from twisted.web.http_headers import Headers
6
7 from sse_client import EventSourceProtocol
8
9 setup()
10
11
12 class EventSource(object):
13 """
14 The main EventSource class
15 """
16 def __init__(self, url):
17 self.url = url
18 self.protocol = EventSourceProtocol()
19 self.errorHandler = None
20 self.stashedError = None
21 self.connect()
22
23 @run_in_reactor
24 def connect(self):
25 """
26 Connect to the event source URL
27 """
28 agent = Agent(reactor)
29 d = agent.request(
30 'GET',
31 self.url,
32 Headers({
33 'User-Agent': ['Twisted SSE Client'],
34 'Cache-Control': ['no-cache'],
35 'Accept': ['text/event-stream; charset=utf-8'],
36 }),
37 None)
38 d.addErrback(self.connectError)
39 d.addCallback(self.cbRequest)
40
41 def cbRequest(self, response):
42 if response.code != 200:
43 self.callErrorHandler("non 200 response received: %d" %
44 response.code)
45 else:
46 finished = Deferred()
47 self.protocol.setFinishedDeferred(finished)
48 response.deliverBody(self.protocol)
49 return finished
50
51 def connectError(self, ignored):
52 self.callErrorHandler("error connecting to endpoint: %s" % self.url)
53
54 def callErrorHandler(self, msg):
55 if self.errorHandler:
56 func, callInThread = self.errorHandler
57 if callInThread:
58 reactor.callInThread(func, msg)
59 else:
60 func(msg)
61 else:
62 self.stashedError = msg
63
64 def onerror(self, func, callInThread=False):
65 self.errorHandler = func, callInThread
66 if self.stashedError:
67 self.callErrorHandler(self.stashedError)
68
69 def onmessage(self, func, callInThread=False):
70 self.addEventListener('message', func, callInThread)
71
72 def addEventListener(self, event, func, callInThread=False):
73 callback = func
74 if callInThread:
75 callback = lambda data: reactor.callInThread(func, data)
76 self.protocol.addCallback(event, callback)