Mercurial > code > home > repos > homeauto
comparison lib/mqtt_client/mqtt_client.py @ 1446:ef03d25cc815
fix import for rx 3.x
Ignore-this: a8944e85d51997a853c90ed1893eb1f6
darcs-hash:216b8893da0be687a7150c14030052bdd67625cd
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Wed, 25 Sep 2019 16:08:43 -0700 |
parents | 8d165cd29a5b |
children |
comparison
equal
deleted
inserted
replaced
1445:0087017efecb | 1446:ef03d25cc815 |
---|---|
1 import logging | 1 import logging |
2 from mqtt.client.factory import MQTTFactory | 2 from mqtt.client.factory import MQTTFactory |
3 from rx.subjects import Subject | 3 import rx.subject |
4 from twisted.application.internet import ClientService | 4 from twisted.application.internet import ClientService |
5 from twisted.internet import reactor | 5 from twisted.internet import reactor |
6 from twisted.internet.defer import inlineCallbacks | 6 from twisted.internet.defer import inlineCallbacks |
7 from twisted.internet.endpoints import clientFromString | 7 from twisted.internet.endpoints import clientFromString |
8 | 8 |
33 if not topics: | 33 if not topics: |
34 return | 34 return |
35 log.info('subscribing %r', topics) | 35 log.info('subscribing %r', topics) |
36 self.protocol.subscribe(topics=[(topic.decode('utf8'), 2) for topic in topics]) | 36 self.protocol.subscribe(topics=[(topic.decode('utf8'), 2) for topic in topics]) |
37 | 37 |
38 | 38 |
39 @inlineCallbacks | 39 @inlineCallbacks |
40 def connectToBroker(self, protocol): | 40 def connectToBroker(self, protocol): |
41 self.protocol = protocol | 41 self.protocol = protocol |
42 self.protocol.onDisconnection = self._onProtocolDisconnection | 42 self.protocol.onDisconnection = self._onProtocolDisconnection |
43 | 43 |
47 try: | 47 try: |
48 yield self.protocol.connect(self.clientId, keepalive=60) | 48 yield self.protocol.connect(self.clientId, keepalive=60) |
49 except Exception as e: | 49 except Exception as e: |
50 log.error(f"Connecting to {self.endpoint} raised {e!s}") | 50 log.error(f"Connecting to {self.endpoint} raised {e!s}") |
51 return | 51 return |
52 | 52 |
53 log.info(f"Connected to {self.endpoint}") | 53 log.info(f"Connected to {self.endpoint}") |
54 | 54 |
55 self.protocol.onPublish = self._onProtocolMessage | 55 self.protocol.onPublish = self._onProtocolMessage |
56 self._subscribeAll() | 56 self._subscribeAll() |
57 | 57 |
58 def _onProtocolMessage(self, topic, payload, qos, dup, retain, msgId): | 58 def _onProtocolMessage(self, topic, payload, qos, dup, retain, msgId): |
59 topic = topic.encode('ascii') | 59 topic = topic.encode('ascii') |
60 observers = self.observersByTopic.get(topic, []) | 60 observers = self.observersByTopic.get(topic, []) |
61 log.debug(f'received {topic} payload {payload} ({len(observers)} obs)') | 61 log.debug(f'received {topic} payload {payload} ({len(observers)} obs)') |
62 for obs in observers: | 62 for obs in observers: |
63 obs.on_next(payload) | 63 obs.on_next(payload) |
64 | 64 |
65 def _onProtocolDisconnection(self, reason): | 65 def _onProtocolDisconnection(self, reason): |
66 log.warn("Connection to broker lost: %r", reason) | 66 log.warn("Connection to broker lost: %r", reason) |
67 self.whenConnected().addCallback(self.connectToBroker) | 67 self.whenConnected().addCallback(self.connectToBroker) |
68 | 68 |
69 def publish(self, topic: bytes, msg: bytes): | 69 def publish(self, topic: bytes, msg: bytes): |
77 | 77 |
78 class MqttClient(object): | 78 class MqttClient(object): |
79 def __init__(self, clientId, brokerHost='bang', brokerPort=1883): | 79 def __init__(self, clientId, brokerHost='bang', brokerPort=1883): |
80 | 80 |
81 self.observersByTopic = {} # bytes: Set(observer) | 81 self.observersByTopic = {} # bytes: Set(observer) |
82 | 82 |
83 factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER) | 83 factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER) |
84 myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort)) | 84 myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort)) |
85 myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port)) | 85 myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port)) |
86 self.serv = MQTTService(myEndpoint, factory, self.observersByTopic, | 86 self.serv = MQTTService(myEndpoint, factory, self.observersByTopic, |
87 clientId) | 87 clientId) |
90 def publish(self, topic: bytes, msg: bytes): | 90 def publish(self, topic: bytes, msg: bytes): |
91 return self.serv.publish(topic, msg) | 91 return self.serv.publish(topic, msg) |
92 | 92 |
93 def subscribe(self, topic: bytes): | 93 def subscribe(self, topic: bytes): |
94 """returns rx.Observable of payload strings""" | 94 """returns rx.Observable of payload strings""" |
95 ret = Subject() | 95 ret = rx.subject.Subject() |
96 self.observersByTopic.setdefault(topic, set()).add(ret) | 96 self.observersByTopic.setdefault(topic, set()).add(ret) |
97 self.serv.ensureSubscribed(topic) | 97 self.serv.ensureSubscribed(topic) |
98 return ret | 98 return ret |