view lib/mqtt_client/mqtt_client.py @ 1369:42f4fed9cd25

no more immediateUpdate since we push patch events now. and the code was broken for py3 anyway Ignore-this: 54edfd3885f749aa5ef3be2051c2f40f darcs-hash:0f1ccb011e9cf29bf9e2389de3833cbda3550631
author drewp <drewp@bigasterisk.com>
date Sun, 05 May 2019 17:09:12 -0700
parents 3cf19717cb6f
children f883166f7ca1
line wrap: on
line source

import logging
from mqtt.client.factory import MQTTFactory
from rx import Observable
from rx.concurrency import TwistedScheduler
from twisted.application.internet import ClientService, backoffPolicy
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, Deferred
from twisted.internet.endpoints import clientFromString

log = logging.getLogger('mqtt_client')

class MQTTService(ClientService):

    def __init__(self, endpoint, factory):
        self.endpoint = endpoint
        ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy())

    def startService(self):
        self.whenConnected().addCallback(self.connectToBroker)
        ClientService.startService(self)

    @inlineCallbacks
    def connectToBroker(self, protocol):
        self.protocol = protocol
        self.protocol.onDisconnection = self.onDisconnection
        # We are issuing 3 publish in a row
        # if order matters, then set window size to 1
        # Publish requests beyond window size are enqueued
        self.protocol.setWindowSize(1)

        try:
            yield self.protocol.connect("TwistedMQTT-pub", keepalive=60)
        except Exception as e:
            log.error("Connecting to {broker} raised {excp!s}",
                      broker=self.endpoint, excp=e)
        else:
            log.info("Connected to {broker}".format(broker=self.endpoint))
        if getattr(self, 'onMqttConnectionMade', False):
            self.onMqttConnectionMade()

    def onDisconnection(self, reason):
        log.warn("Connection to broker lost: %r", reason)
        self.whenConnected().addCallback(self.connectToBroker)

    def publish(self, topic: bytes, msg: bytes):
        def _logFailure(failure):
            log.warn("publish failed: %s", failure.getErrorMessage())
            return failure

        return self.protocol.publish(topic=topic.decode('utf-8'), qos=0,
                                     message=bytearray(msg)).addErrback(_logFailure)


class MqttClient(object):
    def __init__(self, brokerHost='bang', brokerPort=1883):

        #scheduler = TwistedScheduler(reactor)
        
        factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER)
        myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort))
        myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port))
        self.serv = MQTTService(myEndpoint, factory)
        self.serv.startService()
        
    def publish(self, topic, msg):
        return self.serv.publish(topic, msg)

    def subscribe(self, topic: bytes):
        """returns rx.Observable of payload strings"""
        # This is surely broken for multiple topics and subscriptions. Might not even
        # work over a reconnect.
        
        ret = Observable.create(self._observe_msgs)

        self.serv.onMqttConnectionMade = lambda: self._resubscribe(topic)
        if (hasattr(self.serv, 'protocol') and
            self.serv.protocol.state ==self.serv.protocol.CONNECTED):
            self._resubscribe(topic)
        return ret

    def _resubscribe(self, topic: bytes):
        log.info('subscribing %r', topic)
        self.serv.protocol.onPublish = self._onPublish
        return self.serv.protocol.subscribe(topics=[(topic.decode('utf-8'), 2)])
        
    def _observe_msgs(self, observer):
        self.obs = observer

    def _onPublish(self, topic, payload, qos, dup, retain, msgId):
        log.debug('received payload %r', payload)
        self.obs.on_next(payload)