comparison lib/mqtt_client/mqtt_client.py @ 1359:8fb419235dc9

some py3 compatibility Ignore-this: 2175eb9b2e4f93e5de3f835914943803 darcs-hash:d6f647a108dfbc9f7d1e146f25b199c5de5ffc6c
author drewp <drewp@bigasterisk.com>
date Wed, 01 May 2019 00:03:00 -0700
parents f99fe03803d4
children e121dc5c09df
comparison
equal deleted inserted replaced
1358:fda1e7bdecb7 1359:8fb419235dc9
40 40
41 def onDisconnection(self, reason): 41 def onDisconnection(self, reason):
42 log.warn("Connection to broker lost: %r", reason) 42 log.warn("Connection to broker lost: %r", reason)
43 self.whenConnected().addCallback(self.connectToBroker) 43 self.whenConnected().addCallback(self.connectToBroker)
44 44
45 def publish(self, topic, msg): 45 def publish(self, topic: bytes, msg: bytes):
46 def _logFailure(failure): 46 def _logFailure(failure):
47 log.warn("publish failed: %s", failure.getErrorMessage()) 47 log.warn("publish failed: %s", failure.getErrorMessage())
48 return failure 48 return failure
49 49
50 return self.protocol.publish(topic=topic, qos=0, message=msg).addErrback(_logFailure) 50 return self.protocol.publish(topic=bytearray(topic), qos=0,
51 message=bytearray(msg)).addErrback(_logFailure)
51 52
52 53
53 class MqttClient(object): 54 class MqttClient(object):
54 def __init__(self, brokerHost='bang', brokerPort=1883): 55 def __init__(self, brokerHost='bang', brokerPort=1883):
55 56
62 self.serv.startService() 63 self.serv.startService()
63 64
64 def publish(self, topic, msg): 65 def publish(self, topic, msg):
65 return self.serv.publish(topic, msg) 66 return self.serv.publish(topic, msg)
66 67
67 def subscribe(self, topic): 68 def subscribe(self, topic: bytes):
68 """returns rx.Observable of payload strings""" 69 """returns rx.Observable of payload strings"""
69 # This is surely broken for multiple topics and subscriptions. Might not even 70 # This is surely broken for multiple topics and subscriptions. Might not even
70 # work over a reconnect. 71 # work over a reconnect.
71 72
72 ret = Observable.create(self._observe_msgs) 73 ret = Observable.create(self._observe_msgs)
78 return ret 79 return ret
79 80
80 def _resubscribe(self, topic): 81 def _resubscribe(self, topic):
81 log.info('subscribing %r', topic) 82 log.info('subscribing %r', topic)
82 self.serv.protocol.onPublish = self._onPublish 83 self.serv.protocol.onPublish = self._onPublish
83 return self.serv.protocol.subscribe(topic, 2) 84 return self.serv.protocol.subscribe(topics=[(topic, 2)])
84 85
85 def _observe_msgs(self, observer): 86 def _observe_msgs(self, observer):
86 self.obs = observer 87 self.obs = observer
87 88
88 def _onPublish(self, topic, payload, qos, dup, retain, msgId): 89 def _onPublish(self, topic, payload, qos, dup, retain, msgId):