Mercurial > code > home > repos > homeauto
comparison lib/mqtt_client/mqtt_client.py @ 1359:8fb419235dc9
some py3 compatibility
Ignore-this: 2175eb9b2e4f93e5de3f835914943803
darcs-hash:d6f647a108dfbc9f7d1e146f25b199c5de5ffc6c
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Wed, 01 May 2019 00:03:00 -0700 |
parents | f99fe03803d4 |
children | e121dc5c09df |
comparison
equal
deleted
inserted
replaced
1358:fda1e7bdecb7 | 1359:8fb419235dc9 |
---|---|
40 | 40 |
41 def onDisconnection(self, reason): | 41 def onDisconnection(self, reason): |
42 log.warn("Connection to broker lost: %r", reason) | 42 log.warn("Connection to broker lost: %r", reason) |
43 self.whenConnected().addCallback(self.connectToBroker) | 43 self.whenConnected().addCallback(self.connectToBroker) |
44 | 44 |
45 def publish(self, topic, msg): | 45 def publish(self, topic: bytes, msg: bytes): |
46 def _logFailure(failure): | 46 def _logFailure(failure): |
47 log.warn("publish failed: %s", failure.getErrorMessage()) | 47 log.warn("publish failed: %s", failure.getErrorMessage()) |
48 return failure | 48 return failure |
49 | 49 |
50 return self.protocol.publish(topic=topic, qos=0, message=msg).addErrback(_logFailure) | 50 return self.protocol.publish(topic=bytearray(topic), qos=0, |
51 message=bytearray(msg)).addErrback(_logFailure) | |
51 | 52 |
52 | 53 |
53 class MqttClient(object): | 54 class MqttClient(object): |
54 def __init__(self, brokerHost='bang', brokerPort=1883): | 55 def __init__(self, brokerHost='bang', brokerPort=1883): |
55 | 56 |
62 self.serv.startService() | 63 self.serv.startService() |
63 | 64 |
64 def publish(self, topic, msg): | 65 def publish(self, topic, msg): |
65 return self.serv.publish(topic, msg) | 66 return self.serv.publish(topic, msg) |
66 | 67 |
67 def subscribe(self, topic): | 68 def subscribe(self, topic: bytes): |
68 """returns rx.Observable of payload strings""" | 69 """returns rx.Observable of payload strings""" |
69 # This is surely broken for multiple topics and subscriptions. Might not even | 70 # This is surely broken for multiple topics and subscriptions. Might not even |
70 # work over a reconnect. | 71 # work over a reconnect. |
71 | 72 |
72 ret = Observable.create(self._observe_msgs) | 73 ret = Observable.create(self._observe_msgs) |
78 return ret | 79 return ret |
79 | 80 |
80 def _resubscribe(self, topic): | 81 def _resubscribe(self, topic): |
81 log.info('subscribing %r', topic) | 82 log.info('subscribing %r', topic) |
82 self.serv.protocol.onPublish = self._onPublish | 83 self.serv.protocol.onPublish = self._onPublish |
83 return self.serv.protocol.subscribe(topic, 2) | 84 return self.serv.protocol.subscribe(topics=[(topic, 2)]) |
84 | 85 |
85 def _observe_msgs(self, observer): | 86 def _observe_msgs(self, observer): |
86 self.obs = observer | 87 self.obs = observer |
87 | 88 |
88 def _onPublish(self, topic, payload, qos, dup, retain, msgId): | 89 def _onPublish(self, topic, payload, qos, dup, retain, msgId): |