Mercurial > code > home > repos > homeauto
changeset 1446:ef03d25cc815
fix import for rx 3.x
Ignore-this: a8944e85d51997a853c90ed1893eb1f6
darcs-hash:216b8893da0be687a7150c14030052bdd67625cd
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Wed, 25 Sep 2019 16:08:43 -0700 |
parents | 0087017efecb |
children | 67e8b237d7bf |
files | lib/mqtt_client/mqtt_client.py |
diffstat | 1 files changed, 7 insertions(+), 7 deletions(-) [+] |
line wrap: on
line diff
--- a/lib/mqtt_client/mqtt_client.py Tue Sep 24 14:04:25 2019 -0700 +++ b/lib/mqtt_client/mqtt_client.py Wed Sep 25 16:08:43 2019 -0700 @@ -1,6 +1,6 @@ import logging from mqtt.client.factory import MQTTFactory -from rx.subjects import Subject +import rx.subject from twisted.application.internet import ClientService from twisted.internet import reactor from twisted.internet.defer import inlineCallbacks @@ -35,7 +35,7 @@ log.info('subscribing %r', topics) self.protocol.subscribe(topics=[(topic.decode('utf8'), 2) for topic in topics]) - + @inlineCallbacks def connectToBroker(self, protocol): self.protocol = protocol @@ -49,19 +49,19 @@ except Exception as e: log.error(f"Connecting to {self.endpoint} raised {e!s}") return - + log.info(f"Connected to {self.endpoint}") self.protocol.onPublish = self._onProtocolMessage self._subscribeAll() - + def _onProtocolMessage(self, topic, payload, qos, dup, retain, msgId): topic = topic.encode('ascii') observers = self.observersByTopic.get(topic, []) log.debug(f'received {topic} payload {payload} ({len(observers)} obs)') for obs in observers: obs.on_next(payload) - + def _onProtocolDisconnection(self, reason): log.warn("Connection to broker lost: %r", reason) self.whenConnected().addCallback(self.connectToBroker) @@ -79,7 +79,7 @@ def __init__(self, clientId, brokerHost='bang', brokerPort=1883): self.observersByTopic = {} # bytes: Set(observer) - + factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER) myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort)) myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port)) @@ -92,7 +92,7 @@ def subscribe(self, topic: bytes): """returns rx.Observable of payload strings""" - ret = Subject() + ret = rx.subject.Subject() self.observersByTopic.setdefault(topic, set()).add(ret) self.serv.ensureSubscribed(topic) return ret