Mercurial > code > home > repos > homeauto
annotate lib/mqtt_client/mqtt_client.py @ 1361:e121dc5c09df
mqtt lib wants to encode topic names
Ignore-this: 2b72670ac614943e7188419e1aba93b
darcs-hash:59524f522f3e7922db38552ea95978df7254cbb6
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Wed, 01 May 2019 00:06:50 -0700 |
parents | 8fb419235dc9 |
children | 3cf19717cb6f |
rev | line source |
---|---|
1357 | 1 import logging |
2 from mqtt.client.factory import MQTTFactory | |
3 from rx import Observable | |
4 from rx.concurrency import TwistedScheduler | |
5 from twisted.application.internet import ClientService, backoffPolicy | |
6 from twisted.internet import reactor | |
7 from twisted.internet.defer import inlineCallbacks, Deferred | |
8 from twisted.internet.endpoints import clientFromString | |
9 | |
10 log = logging.getLogger('mqtt_client') | |
11 | |
12 class MQTTService(ClientService): | |
13 | |
14 def __init__(self, endpoint, factory): | |
15 self.endpoint = endpoint | |
16 ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy()) | |
17 | |
18 def startService(self): | |
19 self.whenConnected().addCallback(self.connectToBroker) | |
20 ClientService.startService(self) | |
21 | |
22 @inlineCallbacks | |
23 def connectToBroker(self, protocol): | |
24 self.protocol = protocol | |
25 self.protocol.onDisconnection = self.onDisconnection | |
26 # We are issuing 3 publish in a row | |
27 # if order matters, then set window size to 1 | |
28 # Publish requests beyond window size are enqueued | |
29 self.protocol.setWindowSize(1) | |
30 | |
31 try: | |
32 yield self.protocol.connect("TwistedMQTT-pub", keepalive=60) | |
33 except Exception as e: | |
34 log.error("Connecting to {broker} raised {excp!s}", | |
35 broker=self.endpoint, excp=e) | |
36 else: | |
37 log.info("Connected to {broker}".format(broker=self.endpoint)) | |
38 if getattr(self, 'onMqttConnectionMade', False): | |
39 self.onMqttConnectionMade() | |
40 | |
41 def onDisconnection(self, reason): | |
42 log.warn("Connection to broker lost: %r", reason) | |
43 self.whenConnected().addCallback(self.connectToBroker) | |
44 | |
1359 | 45 def publish(self, topic: bytes, msg: bytes): |
1357 | 46 def _logFailure(failure): |
47 log.warn("publish failed: %s", failure.getErrorMessage()) | |
48 return failure | |
49 | |
1359 | 50 return self.protocol.publish(topic=bytearray(topic), qos=0, |
51 message=bytearray(msg)).addErrback(_logFailure) | |
1357 | 52 |
53 | |
54 class MqttClient(object): | |
55 def __init__(self, brokerHost='bang', brokerPort=1883): | |
56 | |
57 #scheduler = TwistedScheduler(reactor) | |
58 | |
59 factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER) | |
60 myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort)) | |
61 myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port)) | |
62 self.serv = MQTTService(myEndpoint, factory) | |
63 self.serv.startService() | |
64 | |
65 def publish(self, topic, msg): | |
66 return self.serv.publish(topic, msg) | |
67 | |
1359 | 68 def subscribe(self, topic: bytes): |
1357 | 69 """returns rx.Observable of payload strings""" |
70 # This is surely broken for multiple topics and subscriptions. Might not even | |
71 # work over a reconnect. | |
72 | |
73 ret = Observable.create(self._observe_msgs) | |
74 | |
75 self.serv.onMqttConnectionMade = lambda: self._resubscribe(topic) | |
76 if (hasattr(self.serv, 'protocol') and | |
77 self.serv.protocol.state ==self.serv.protocol.CONNECTED): | |
78 self._resubscribe(topic) | |
79 return ret | |
80 | |
1361
e121dc5c09df
mqtt lib wants to encode topic names
drewp <drewp@bigasterisk.com>
parents:
1359
diff
changeset
|
81 def _resubscribe(self, topic: bytes): |
1357 | 82 log.info('subscribing %r', topic) |
83 self.serv.protocol.onPublish = self._onPublish | |
1361
e121dc5c09df
mqtt lib wants to encode topic names
drewp <drewp@bigasterisk.com>
parents:
1359
diff
changeset
|
84 return self.serv.protocol.subscribe(topics=[(topic.decode('utf-8'), 2)]) |
1357 | 85 |
86 def _observe_msgs(self, observer): | |
87 self.obs = observer | |
88 | |
89 def _onPublish(self, topic, payload, qos, dup, retain, msgId): | |
90 log.debug('received payload %r', payload) | |
91 self.obs.on_next(payload) |