comparison lib/mqtt_client/mqtt_client.py @ 779:bad87b7dc608

subscribe with AT_LEAST_ONCE flag
author drewp@bigasterisk.com
date Sat, 08 Aug 2020 14:02:46 -0700
parents 3d3ad50e2c51
children
comparison
equal deleted inserted replaced
778:acf58b83022f 779:bad87b7dc608
5 from twisted.internet import reactor 5 from twisted.internet import reactor
6 from twisted.internet.defer import inlineCallbacks 6 from twisted.internet.defer import inlineCallbacks
7 from twisted.internet.endpoints import clientFromString 7 from twisted.internet.endpoints import clientFromString
8 8
9 log = logging.getLogger('mqtt_client') 9 log = logging.getLogger('mqtt_client')
10 AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE = 0, 1, 2
10 11
11 class MQTTService(ClientService): 12 class MQTTService(ClientService):
12 13
13 def __init__(self, endpoint, factory, observersByTopic, clientId): 14 def __init__(self, endpoint, factory, observersByTopic, clientId):
14 self.endpoint = endpoint 15 self.endpoint = endpoint
23 def ensureSubscribed(self, topic: bytes): 24 def ensureSubscribed(self, topic: bytes):
24 self.whenConnected().addCallback(self._subscribeToLatestTopic, topic) 25 self.whenConnected().addCallback(self._subscribeToLatestTopic, topic)
25 26
26 def _subscribeToLatestTopic(self, protocol, topic: bytes): 27 def _subscribeToLatestTopic(self, protocol, topic: bytes):
27 if protocol.state == protocol.CONNECTED: 28 if protocol.state == protocol.CONNECTED:
28 self.protocol.subscribe(topics=[(topic.decode('utf8'), 2)]) 29 self.protocol.subscribe(topics=[(topic.decode('utf8'), AT_LEAST_ONCE)])
29 # else it'll get done in the next connectToBroker. 30 # else it'll get done in the next connectToBroker.
30 31
31 def _subscribeAll(self): 32 def _subscribeAll(self):
32 topics = list(self.observersByTopic) 33 topics = list(self.observersByTopic)
33 if not topics: 34 if not topics:
34 return 35 return
35 log.info('subscribing %r', topics) 36 log.info('subscribing %r', topics)
36 self.protocol.subscribe(topics=[(topic.decode('utf8'), 2) for topic in topics]) 37 self.protocol.subscribe(topics=[(topic.decode('utf8'), AT_LEAST_ONCE) for topic in topics])
37 38
38 39
39 @inlineCallbacks 40 @inlineCallbacks
40 def connectToBroker(self, protocol): 41 def connectToBroker(self, protocol):
41 self.protocol = protocol 42 self.protocol = protocol