comparison lib/mqtt_client/mqtt_client.py @ 1446:ef03d25cc815

fix import for rx 3.x Ignore-this: a8944e85d51997a853c90ed1893eb1f6 darcs-hash:216b8893da0be687a7150c14030052bdd67625cd
author drewp <drewp@bigasterisk.com>
date Wed, 25 Sep 2019 16:08:43 -0700
parents 8d165cd29a5b
children
comparison
equal deleted inserted replaced
1445:0087017efecb 1446:ef03d25cc815
1 import logging 1 import logging
2 from mqtt.client.factory import MQTTFactory 2 from mqtt.client.factory import MQTTFactory
3 from rx.subjects import Subject 3 import rx.subject
4 from twisted.application.internet import ClientService 4 from twisted.application.internet import ClientService
5 from twisted.internet import reactor 5 from twisted.internet import reactor
6 from twisted.internet.defer import inlineCallbacks 6 from twisted.internet.defer import inlineCallbacks
7 from twisted.internet.endpoints import clientFromString 7 from twisted.internet.endpoints import clientFromString
8 8
33 if not topics: 33 if not topics:
34 return 34 return
35 log.info('subscribing %r', topics) 35 log.info('subscribing %r', topics)
36 self.protocol.subscribe(topics=[(topic.decode('utf8'), 2) for topic in topics]) 36 self.protocol.subscribe(topics=[(topic.decode('utf8'), 2) for topic in topics])
37 37
38 38
39 @inlineCallbacks 39 @inlineCallbacks
40 def connectToBroker(self, protocol): 40 def connectToBroker(self, protocol):
41 self.protocol = protocol 41 self.protocol = protocol
42 self.protocol.onDisconnection = self._onProtocolDisconnection 42 self.protocol.onDisconnection = self._onProtocolDisconnection
43 43
47 try: 47 try:
48 yield self.protocol.connect(self.clientId, keepalive=60) 48 yield self.protocol.connect(self.clientId, keepalive=60)
49 except Exception as e: 49 except Exception as e:
50 log.error(f"Connecting to {self.endpoint} raised {e!s}") 50 log.error(f"Connecting to {self.endpoint} raised {e!s}")
51 return 51 return
52 52
53 log.info(f"Connected to {self.endpoint}") 53 log.info(f"Connected to {self.endpoint}")
54 54
55 self.protocol.onPublish = self._onProtocolMessage 55 self.protocol.onPublish = self._onProtocolMessage
56 self._subscribeAll() 56 self._subscribeAll()
57 57
58 def _onProtocolMessage(self, topic, payload, qos, dup, retain, msgId): 58 def _onProtocolMessage(self, topic, payload, qos, dup, retain, msgId):
59 topic = topic.encode('ascii') 59 topic = topic.encode('ascii')
60 observers = self.observersByTopic.get(topic, []) 60 observers = self.observersByTopic.get(topic, [])
61 log.debug(f'received {topic} payload {payload} ({len(observers)} obs)') 61 log.debug(f'received {topic} payload {payload} ({len(observers)} obs)')
62 for obs in observers: 62 for obs in observers:
63 obs.on_next(payload) 63 obs.on_next(payload)
64 64
65 def _onProtocolDisconnection(self, reason): 65 def _onProtocolDisconnection(self, reason):
66 log.warn("Connection to broker lost: %r", reason) 66 log.warn("Connection to broker lost: %r", reason)
67 self.whenConnected().addCallback(self.connectToBroker) 67 self.whenConnected().addCallback(self.connectToBroker)
68 68
69 def publish(self, topic: bytes, msg: bytes): 69 def publish(self, topic: bytes, msg: bytes):
77 77
78 class MqttClient(object): 78 class MqttClient(object):
79 def __init__(self, clientId, brokerHost='bang', brokerPort=1883): 79 def __init__(self, clientId, brokerHost='bang', brokerPort=1883):
80 80
81 self.observersByTopic = {} # bytes: Set(observer) 81 self.observersByTopic = {} # bytes: Set(observer)
82 82
83 factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER) 83 factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER)
84 myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort)) 84 myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort))
85 myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port)) 85 myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port))
86 self.serv = MQTTService(myEndpoint, factory, self.observersByTopic, 86 self.serv = MQTTService(myEndpoint, factory, self.observersByTopic,
87 clientId) 87 clientId)
90 def publish(self, topic: bytes, msg: bytes): 90 def publish(self, topic: bytes, msg: bytes):
91 return self.serv.publish(topic, msg) 91 return self.serv.publish(topic, msg)
92 92
93 def subscribe(self, topic: bytes): 93 def subscribe(self, topic: bytes):
94 """returns rx.Observable of payload strings""" 94 """returns rx.Observable of payload strings"""
95 ret = Subject() 95 ret = rx.subject.Subject()
96 self.observersByTopic.setdefault(topic, set()).add(ret) 96 self.observersByTopic.setdefault(topic, set()).add(ret)
97 self.serv.ensureSubscribed(topic) 97 self.serv.ensureSubscribed(topic)
98 return ret 98 return ret