Mercurial > code > home > repos > homeauto
diff lib/mqtt_client/mqtt_client.py @ 1357:f99fe03803d4
mqtt_client into a distributable
Ignore-this: 8c99f2f6220b55e32df3349d02f725c7
darcs-hash:52741036605ef03b0452f03c0151bd1bbed07fdc
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Tue, 30 Apr 2019 23:37:25 -0700 |
parents | |
children | 8fb419235dc9 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/mqtt_client/mqtt_client.py Tue Apr 30 23:37:25 2019 -0700 @@ -0,0 +1,90 @@ +import logging +from mqtt.client.factory import MQTTFactory +from rx import Observable +from rx.concurrency import TwistedScheduler +from twisted.application.internet import ClientService, backoffPolicy +from twisted.internet import reactor +from twisted.internet.defer import inlineCallbacks, Deferred +from twisted.internet.endpoints import clientFromString + +log = logging.getLogger('mqtt_client') + +class MQTTService(ClientService): + + def __init__(self, endpoint, factory): + self.endpoint = endpoint + ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy()) + + def startService(self): + self.whenConnected().addCallback(self.connectToBroker) + ClientService.startService(self) + + @inlineCallbacks + def connectToBroker(self, protocol): + self.protocol = protocol + self.protocol.onDisconnection = self.onDisconnection + # We are issuing 3 publish in a row + # if order matters, then set window size to 1 + # Publish requests beyond window size are enqueued + self.protocol.setWindowSize(1) + + try: + yield self.protocol.connect("TwistedMQTT-pub", keepalive=60) + except Exception as e: + log.error("Connecting to {broker} raised {excp!s}", + broker=self.endpoint, excp=e) + else: + log.info("Connected to {broker}".format(broker=self.endpoint)) + if getattr(self, 'onMqttConnectionMade', False): + self.onMqttConnectionMade() + + def onDisconnection(self, reason): + log.warn("Connection to broker lost: %r", reason) + self.whenConnected().addCallback(self.connectToBroker) + + def publish(self, topic, msg): + def _logFailure(failure): + log.warn("publish failed: %s", failure.getErrorMessage()) + return failure + + return self.protocol.publish(topic=topic, qos=0, message=msg).addErrback(_logFailure) + + +class MqttClient(object): + def __init__(self, brokerHost='bang', brokerPort=1883): + + #scheduler = TwistedScheduler(reactor) + + factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER) + myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort)) + myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port)) + self.serv = MQTTService(myEndpoint, factory) + self.serv.startService() + + def publish(self, topic, msg): + return self.serv.publish(topic, msg) + + def subscribe(self, topic): + """returns rx.Observable of payload strings""" + # This is surely broken for multiple topics and subscriptions. Might not even + # work over a reconnect. + + ret = Observable.create(self._observe_msgs) + + self.serv.onMqttConnectionMade = lambda: self._resubscribe(topic) + if (hasattr(self.serv, 'protocol') and + self.serv.protocol.state ==self.serv.protocol.CONNECTED): + self._resubscribe(topic) + return ret + + def _resubscribe(self, topic): + log.info('subscribing %r', topic) + self.serv.protocol.onPublish = self._onPublish + return self.serv.protocol.subscribe(topic, 2) + + def _observe_msgs(self, observer): + self.obs = observer + + def _onPublish(self, topic, payload, qos, dup, retain, msgId): + log.debug('received payload %r', payload) + self.obs.on_next(payload)