changeset 1385:c887b1cc5e83

caller must set clientId Ignore-this: b6c0f4f20697fa177d42c568269b1c8f darcs-hash:b92509836ada530f47ec0e7d9148ee166bffe23b
author drewp <drewp@bigasterisk.com>
date Fri, 10 May 2019 01:31:21 -0700
parents a29a55f3429c
children 4e161b451e56
files lib/mqtt_client/mqtt_client.py
diffstat 1 files changed, 7 insertions(+), 7 deletions(-) [+]
line wrap: on
line diff
--- a/lib/mqtt_client/mqtt_client.py	Thu May 09 22:31:04 2019 -0700
+++ b/lib/mqtt_client/mqtt_client.py	Fri May 10 01:31:21 2019 -0700
@@ -1,20 +1,19 @@
 import logging
 from mqtt.client.factory import MQTTFactory
-from rx import Observable
 from rx.subjects import Subject
-from rx.concurrency import TwistedScheduler
 from twisted.application.internet import ClientService
 from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks, Deferred
+from twisted.internet.defer import inlineCallbacks
 from twisted.internet.endpoints import clientFromString
 
 log = logging.getLogger('mqtt_client')
 
 class MQTTService(ClientService):
 
-    def __init__(self, endpoint, factory, observersByTopic):
+    def __init__(self, endpoint, factory, observersByTopic, clientId):
         self.endpoint = endpoint
         self.observersByTopic = observersByTopic
+        self.clientId = clientId
         ClientService.__init__(self, endpoint, factory, retryPolicy=lambda _: 5)
 
     def startService(self):
@@ -44,7 +43,7 @@
         self.protocol.setWindowSize(1)
 
         try:
-            yield self.protocol.connect("TwistedMQTT-pub", keepalive=60)
+            yield self.protocol.connect(self.clientId, keepalive=60)
         except Exception as e:
             log.error(f"Connecting to {self.endpoint} raised {e!s}")
             return
@@ -75,14 +74,15 @@
 
 
 class MqttClient(object):
-    def __init__(self, brokerHost='bang', brokerPort=1883):
+    def __init__(self, clientId, brokerHost='bang', brokerPort=1883):
 
         self.observersByTopic = {} # bytes: Set(observer)
         
         factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER)
         myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort))
         myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port))
-        self.serv = MQTTService(myEndpoint, factory, self.observersByTopic)
+        self.serv = MQTTService(myEndpoint, factory, self.observersByTopic,
+                                clientId)
         self.serv.startService()
 
     def publish(self, topic: bytes, msg: bytes):