annotate lib/mqtt_client.py @ 1348:e681221ab8a3

release 0.4.0 Ignore-this: fa3be75cf8765569c6498f743858ad48 darcs-hash:9803849d98c7a18ed90df08f907749c1ee1d037d
author drewp <drewp@bigasterisk.com>
date Thu, 25 Apr 2019 23:01:26 -0700
parents 6561367aa60a
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
1183
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
1 from mqtt.client.factory import MQTTFactory
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
2 from twisted.application.internet import ClientService, backoffPolicy
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
3 from twisted.internet.defer import inlineCallbacks, Deferred
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
4 from twisted.internet.endpoints import clientFromString
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
5 import logging
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
6 from twisted.internet import reactor
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
7 from rx.concurrency import TwistedScheduler
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
8 from rx import Observable
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
9
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
10 log = logging.getLogger('mqtt_client')
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
11
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
12 class MQTTService(ClientService):
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
13
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
14 def __init__(self, endpoint, factory):
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
15 self.endpoint = endpoint
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
16 ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy())
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
17
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
18 def startService(self):
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
19 self.whenConnected().addCallback(self.connectToBroker)
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
20 ClientService.startService(self)
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
21
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
22 @inlineCallbacks
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
23 def connectToBroker(self, protocol):
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
24 self.protocol = protocol
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
25 self.protocol.onDisconnection = self.onDisconnection
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
26 # We are issuing 3 publish in a row
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
27 # if order matters, then set window size to 1
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
28 # Publish requests beyond window size are enqueued
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
29 self.protocol.setWindowSize(1)
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
30
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
31 try:
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
32 yield self.protocol.connect("TwistedMQTT-pub", keepalive=60)
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
33 except Exception as e:
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
34 log.error("Connecting to {broker} raised {excp!s}",
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
35 broker=self.endpoint, excp=e)
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
36 else:
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
37 log.info("Connected to {broker}".format(broker=self.endpoint))
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
38 if getattr(self, 'onMqttConnectionMade', False):
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
39 self.onMqttConnectionMade()
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
40
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
41 def onDisconnection(self, reason):
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
42 log.warn("Connection to broker lost: %r", reason)
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
43 self.whenConnected().addCallback(self.connectToBroker)
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
44
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
45 def publish(self, topic, msg):
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
46 def _logFailure(failure):
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
47 log.warn("publish failed: %s", failure.getErrorMessage())
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
48 return failure
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
49
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
50 return self.protocol.publish(topic=topic, qos=0, message=msg).addErrback(_logFailure)
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
51
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
52
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
53 class MqttClient(object):
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
54 def __init__(self, brokerHost='bang', brokerPort=1883):
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
55
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
56 #scheduler = TwistedScheduler(reactor)
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
57
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
58 factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER)
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
59 myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort))
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
60 myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port))
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
61 self.serv = MQTTService(myEndpoint, factory)
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
62 self.serv.startService()
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
63
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
64 def publish(self, topic, msg):
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
65 return self.serv.publish(topic, msg)
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
66
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
67 def subscribe(self, topic):
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
68 """returns rx.Observable of payload strings"""
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
69 # This is surely broken for multiple topics and subscriptions. Might not even
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
70 # work over a reconnect.
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
71
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
72 ret = Observable.create(self._observe_msgs)
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
73
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
74 self.serv.onMqttConnectionMade = lambda: self._resubscribe(topic)
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
75 if (hasattr(self.serv, 'protocol') and
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
76 self.serv.protocol.state ==self.serv.protocol.CONNECTED):
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
77 self._resubscribe(topic)
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
78 return ret
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
79
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
80 def _resubscribe(self, topic):
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
81 log.info('subscribing %r', topic)
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
82 self.serv.protocol.onPublish = self._onPublish
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
83 return self.serv.protocol.subscribe(topic, 2)
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
84
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
85 def _observe_msgs(self, observer):
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
86 self.obs = observer
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
87
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
88 def _onPublish(self, topic, payload, qos, dup, retain, msgId):
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
89 log.debug('received payload %r', payload)
6561367aa60a factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff changeset
90 self.obs.on_next(payload)