Mercurial > code > home > repos > homeauto
comparison lib/twisted_sse_demo/eventsource.py @ 351:7716b1810d6c
reasoning & collector move into docker images
Ignore-this: 67e97d307eba96791cbe77e57c57ad57
author | drewp@bigasterisk.com |
---|---|
date | Mon, 03 Sep 2018 00:45:34 -0700 |
parents | service/reasoning/twisted_sse_demo/eventsource.py@29f593aee67b |
children | 59067d81a296 |
comparison
equal
deleted
inserted
replaced
350:a380561fd8a8 | 351:7716b1810d6c |
---|---|
1 from crochet import setup, run_in_reactor | |
2 from twisted.internet import reactor | |
3 from twisted.internet.defer import Deferred | |
4 from twisted.web.client import Agent | |
5 from twisted.web.http_headers import Headers | |
6 | |
7 from sse_client import EventSourceProtocol | |
8 | |
9 setup() | |
10 | |
11 | |
12 class EventSource(object): | |
13 """ | |
14 The main EventSource class | |
15 """ | |
16 def __init__(self, url): | |
17 self.url = url | |
18 self.protocol = EventSourceProtocol() | |
19 self.errorHandler = None | |
20 self.stashedError = None | |
21 self.connect() | |
22 | |
23 @run_in_reactor | |
24 def connect(self): | |
25 """ | |
26 Connect to the event source URL | |
27 """ | |
28 agent = Agent(reactor) | |
29 d = agent.request( | |
30 'GET', | |
31 self.url, | |
32 Headers({ | |
33 'User-Agent': ['Twisted SSE Client'], | |
34 'Cache-Control': ['no-cache'], | |
35 'Accept': ['text/event-stream; charset=utf-8'], | |
36 }), | |
37 None) | |
38 d.addErrback(self.connectError) | |
39 d.addCallback(self.cbRequest) | |
40 | |
41 def cbRequest(self, response): | |
42 if response.code != 200: | |
43 self.callErrorHandler("non 200 response received: %d" % | |
44 response.code) | |
45 else: | |
46 finished = Deferred() | |
47 self.protocol.setFinishedDeferred(finished) | |
48 response.deliverBody(self.protocol) | |
49 return finished | |
50 | |
51 def connectError(self, ignored): | |
52 self.callErrorHandler("error connecting to endpoint: %s" % self.url) | |
53 | |
54 def callErrorHandler(self, msg): | |
55 if self.errorHandler: | |
56 func, callInThread = self.errorHandler | |
57 if callInThread: | |
58 reactor.callInThread(func, msg) | |
59 else: | |
60 func(msg) | |
61 else: | |
62 self.stashedError = msg | |
63 | |
64 def onerror(self, func, callInThread=False): | |
65 self.errorHandler = func, callInThread | |
66 if self.stashedError: | |
67 self.callErrorHandler(self.stashedError) | |
68 | |
69 def onmessage(self, func, callInThread=False): | |
70 self.addEventListener('message', func, callInThread) | |
71 | |
72 def addEventListener(self, event, func, callInThread=False): | |
73 callback = func | |
74 if callInThread: | |
75 callback = lambda data: reactor.callInThread(func, data) | |
76 self.protocol.addCallback(event, callback) |