1357
|
1 import logging
|
|
2 from mqtt.client.factory import MQTTFactory
|
|
3 from rx import Observable
|
|
4 from rx.concurrency import TwistedScheduler
|
|
5 from twisted.application.internet import ClientService, backoffPolicy
|
|
6 from twisted.internet import reactor
|
|
7 from twisted.internet.defer import inlineCallbacks, Deferred
|
|
8 from twisted.internet.endpoints import clientFromString
|
|
9
|
|
10 log = logging.getLogger('mqtt_client')
|
|
11
|
|
12 class MQTTService(ClientService):
|
|
13
|
|
14 def __init__(self, endpoint, factory):
|
|
15 self.endpoint = endpoint
|
|
16 ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy())
|
|
17
|
|
18 def startService(self):
|
|
19 self.whenConnected().addCallback(self.connectToBroker)
|
|
20 ClientService.startService(self)
|
|
21
|
|
22 @inlineCallbacks
|
|
23 def connectToBroker(self, protocol):
|
|
24 self.protocol = protocol
|
|
25 self.protocol.onDisconnection = self.onDisconnection
|
|
26 # We are issuing 3 publish in a row
|
|
27 # if order matters, then set window size to 1
|
|
28 # Publish requests beyond window size are enqueued
|
|
29 self.protocol.setWindowSize(1)
|
|
30
|
|
31 try:
|
|
32 yield self.protocol.connect("TwistedMQTT-pub", keepalive=60)
|
|
33 except Exception as e:
|
|
34 log.error("Connecting to {broker} raised {excp!s}",
|
|
35 broker=self.endpoint, excp=e)
|
|
36 else:
|
|
37 log.info("Connected to {broker}".format(broker=self.endpoint))
|
|
38 if getattr(self, 'onMqttConnectionMade', False):
|
|
39 self.onMqttConnectionMade()
|
|
40
|
|
41 def onDisconnection(self, reason):
|
|
42 log.warn("Connection to broker lost: %r", reason)
|
|
43 self.whenConnected().addCallback(self.connectToBroker)
|
|
44
|
|
45 def publish(self, topic, msg):
|
|
46 def _logFailure(failure):
|
|
47 log.warn("publish failed: %s", failure.getErrorMessage())
|
|
48 return failure
|
|
49
|
|
50 return self.protocol.publish(topic=topic, qos=0, message=msg).addErrback(_logFailure)
|
|
51
|
|
52
|
|
53 class MqttClient(object):
|
|
54 def __init__(self, brokerHost='bang', brokerPort=1883):
|
|
55
|
|
56 #scheduler = TwistedScheduler(reactor)
|
|
57
|
|
58 factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER)
|
|
59 myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort))
|
|
60 myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port))
|
|
61 self.serv = MQTTService(myEndpoint, factory)
|
|
62 self.serv.startService()
|
|
63
|
|
64 def publish(self, topic, msg):
|
|
65 return self.serv.publish(topic, msg)
|
|
66
|
|
67 def subscribe(self, topic):
|
|
68 """returns rx.Observable of payload strings"""
|
|
69 # This is surely broken for multiple topics and subscriptions. Might not even
|
|
70 # work over a reconnect.
|
|
71
|
|
72 ret = Observable.create(self._observe_msgs)
|
|
73
|
|
74 self.serv.onMqttConnectionMade = lambda: self._resubscribe(topic)
|
|
75 if (hasattr(self.serv, 'protocol') and
|
|
76 self.serv.protocol.state ==self.serv.protocol.CONNECTED):
|
|
77 self._resubscribe(topic)
|
|
78 return ret
|
|
79
|
|
80 def _resubscribe(self, topic):
|
|
81 log.info('subscribing %r', topic)
|
|
82 self.serv.protocol.onPublish = self._onPublish
|
|
83 return self.serv.protocol.subscribe(topic, 2)
|
|
84
|
|
85 def _observe_msgs(self, observer):
|
|
86 self.obs = observer
|
|
87
|
|
88 def _onPublish(self, topic, payload, qos, dup, retain, msgId):
|
|
89 log.debug('received payload %r', payload)
|
|
90 self.obs.on_next(payload)
|