Mercurial > code > home > repos > homeauto
comparison lib/mqtt_client/mqtt_client.py @ 779:bad87b7dc608
subscribe with AT_LEAST_ONCE flag
author | drewp@bigasterisk.com |
---|---|
date | Sat, 08 Aug 2020 14:02:46 -0700 |
parents | 3d3ad50e2c51 |
children |
comparison
equal
deleted
inserted
replaced
778:acf58b83022f | 779:bad87b7dc608 |
---|---|
5 from twisted.internet import reactor | 5 from twisted.internet import reactor |
6 from twisted.internet.defer import inlineCallbacks | 6 from twisted.internet.defer import inlineCallbacks |
7 from twisted.internet.endpoints import clientFromString | 7 from twisted.internet.endpoints import clientFromString |
8 | 8 |
9 log = logging.getLogger('mqtt_client') | 9 log = logging.getLogger('mqtt_client') |
10 AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE = 0, 1, 2 | |
10 | 11 |
11 class MQTTService(ClientService): | 12 class MQTTService(ClientService): |
12 | 13 |
13 def __init__(self, endpoint, factory, observersByTopic, clientId): | 14 def __init__(self, endpoint, factory, observersByTopic, clientId): |
14 self.endpoint = endpoint | 15 self.endpoint = endpoint |
23 def ensureSubscribed(self, topic: bytes): | 24 def ensureSubscribed(self, topic: bytes): |
24 self.whenConnected().addCallback(self._subscribeToLatestTopic, topic) | 25 self.whenConnected().addCallback(self._subscribeToLatestTopic, topic) |
25 | 26 |
26 def _subscribeToLatestTopic(self, protocol, topic: bytes): | 27 def _subscribeToLatestTopic(self, protocol, topic: bytes): |
27 if protocol.state == protocol.CONNECTED: | 28 if protocol.state == protocol.CONNECTED: |
28 self.protocol.subscribe(topics=[(topic.decode('utf8'), 2)]) | 29 self.protocol.subscribe(topics=[(topic.decode('utf8'), AT_LEAST_ONCE)]) |
29 # else it'll get done in the next connectToBroker. | 30 # else it'll get done in the next connectToBroker. |
30 | 31 |
31 def _subscribeAll(self): | 32 def _subscribeAll(self): |
32 topics = list(self.observersByTopic) | 33 topics = list(self.observersByTopic) |
33 if not topics: | 34 if not topics: |
34 return | 35 return |
35 log.info('subscribing %r', topics) | 36 log.info('subscribing %r', topics) |
36 self.protocol.subscribe(topics=[(topic.decode('utf8'), 2) for topic in topics]) | 37 self.protocol.subscribe(topics=[(topic.decode('utf8'), AT_LEAST_ONCE) for topic in topics]) |
37 | 38 |
38 | 39 |
39 @inlineCallbacks | 40 @inlineCallbacks |
40 def connectToBroker(self, protocol): | 41 def connectToBroker(self, protocol): |
41 self.protocol = protocol | 42 self.protocol = protocol |