changeset 1446:ef03d25cc815

fix import for rx 3.x Ignore-this: a8944e85d51997a853c90ed1893eb1f6 darcs-hash:216b8893da0be687a7150c14030052bdd67625cd
author drewp <drewp@bigasterisk.com>
date Wed, 25 Sep 2019 16:08:43 -0700
parents 0087017efecb
children 67e8b237d7bf
files lib/mqtt_client/mqtt_client.py
diffstat 1 files changed, 7 insertions(+), 7 deletions(-) [+]
line wrap: on
line diff
--- a/lib/mqtt_client/mqtt_client.py	Tue Sep 24 14:04:25 2019 -0700
+++ b/lib/mqtt_client/mqtt_client.py	Wed Sep 25 16:08:43 2019 -0700
@@ -1,6 +1,6 @@
 import logging
 from mqtt.client.factory import MQTTFactory
-from rx.subjects import Subject
+import rx.subject
 from twisted.application.internet import ClientService
 from twisted.internet import reactor
 from twisted.internet.defer import inlineCallbacks
@@ -35,7 +35,7 @@
         log.info('subscribing %r', topics)
         self.protocol.subscribe(topics=[(topic.decode('utf8'), 2) for topic in topics])
 
-        
+
     @inlineCallbacks
     def connectToBroker(self, protocol):
         self.protocol = protocol
@@ -49,19 +49,19 @@
         except Exception as e:
             log.error(f"Connecting to {self.endpoint} raised {e!s}")
             return
-        
+
         log.info(f"Connected to {self.endpoint}")
 
         self.protocol.onPublish = self._onProtocolMessage
         self._subscribeAll()
-            
+
     def _onProtocolMessage(self, topic, payload, qos, dup, retain, msgId):
         topic = topic.encode('ascii')
         observers = self.observersByTopic.get(topic, [])
         log.debug(f'received {topic} payload {payload} ({len(observers)} obs)')
         for obs in observers:
             obs.on_next(payload)
-            
+
     def _onProtocolDisconnection(self, reason):
         log.warn("Connection to broker lost: %r", reason)
         self.whenConnected().addCallback(self.connectToBroker)
@@ -79,7 +79,7 @@
     def __init__(self, clientId, brokerHost='bang', brokerPort=1883):
 
         self.observersByTopic = {} # bytes: Set(observer)
-        
+
         factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER)
         myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort))
         myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port))
@@ -92,7 +92,7 @@
 
     def subscribe(self, topic: bytes):
         """returns rx.Observable of payload strings"""
-        ret = Subject()
+        ret = rx.subject.Subject()
         self.observersByTopic.setdefault(topic, set()).add(ret)
         self.serv.ensureSubscribed(topic)
         return ret