# HG changeset patch # User drewp # Date 1557477081 25200 # Node ID c887b1cc5e83680da5b6c76709782d75d48f5f59 # Parent a29a55f3429c63821898e6a883b1ee9ce4396af3 caller must set clientId Ignore-this: b6c0f4f20697fa177d42c568269b1c8f darcs-hash:b92509836ada530f47ec0e7d9148ee166bffe23b diff -r a29a55f3429c -r c887b1cc5e83 lib/mqtt_client/mqtt_client.py --- a/lib/mqtt_client/mqtt_client.py Thu May 09 22:31:04 2019 -0700 +++ b/lib/mqtt_client/mqtt_client.py Fri May 10 01:31:21 2019 -0700 @@ -1,20 +1,19 @@ import logging from mqtt.client.factory import MQTTFactory -from rx import Observable from rx.subjects import Subject -from rx.concurrency import TwistedScheduler from twisted.application.internet import ClientService from twisted.internet import reactor -from twisted.internet.defer import inlineCallbacks, Deferred +from twisted.internet.defer import inlineCallbacks from twisted.internet.endpoints import clientFromString log = logging.getLogger('mqtt_client') class MQTTService(ClientService): - def __init__(self, endpoint, factory, observersByTopic): + def __init__(self, endpoint, factory, observersByTopic, clientId): self.endpoint = endpoint self.observersByTopic = observersByTopic + self.clientId = clientId ClientService.__init__(self, endpoint, factory, retryPolicy=lambda _: 5) def startService(self): @@ -44,7 +43,7 @@ self.protocol.setWindowSize(1) try: - yield self.protocol.connect("TwistedMQTT-pub", keepalive=60) + yield self.protocol.connect(self.clientId, keepalive=60) except Exception as e: log.error(f"Connecting to {self.endpoint} raised {e!s}") return @@ -75,14 +74,15 @@ class MqttClient(object): - def __init__(self, brokerHost='bang', brokerPort=1883): + def __init__(self, clientId, brokerHost='bang', brokerPort=1883): self.observersByTopic = {} # bytes: Set(observer) factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER) myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort)) myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port)) - self.serv = MQTTService(myEndpoint, factory, self.observersByTopic) + self.serv = MQTTService(myEndpoint, factory, self.observersByTopic, + clientId) self.serv.startService() def publish(self, topic: bytes, msg: bytes):