comparison lib/mqtt_client/mqtt_client.py @ 1385:c887b1cc5e83

caller must set clientId Ignore-this: b6c0f4f20697fa177d42c568269b1c8f darcs-hash:b92509836ada530f47ec0e7d9148ee166bffe23b
author drewp <drewp@bigasterisk.com>
date Fri, 10 May 2019 01:31:21 -0700
parents f883166f7ca1
children 8d165cd29a5b
comparison
equal deleted inserted replaced
1384:a29a55f3429c 1385:c887b1cc5e83
1 import logging 1 import logging
2 from mqtt.client.factory import MQTTFactory 2 from mqtt.client.factory import MQTTFactory
3 from rx import Observable
4 from rx.subjects import Subject 3 from rx.subjects import Subject
5 from rx.concurrency import TwistedScheduler
6 from twisted.application.internet import ClientService 4 from twisted.application.internet import ClientService
7 from twisted.internet import reactor 5 from twisted.internet import reactor
8 from twisted.internet.defer import inlineCallbacks, Deferred 6 from twisted.internet.defer import inlineCallbacks
9 from twisted.internet.endpoints import clientFromString 7 from twisted.internet.endpoints import clientFromString
10 8
11 log = logging.getLogger('mqtt_client') 9 log = logging.getLogger('mqtt_client')
12 10
13 class MQTTService(ClientService): 11 class MQTTService(ClientService):
14 12
15 def __init__(self, endpoint, factory, observersByTopic): 13 def __init__(self, endpoint, factory, observersByTopic, clientId):
16 self.endpoint = endpoint 14 self.endpoint = endpoint
17 self.observersByTopic = observersByTopic 15 self.observersByTopic = observersByTopic
16 self.clientId = clientId
18 ClientService.__init__(self, endpoint, factory, retryPolicy=lambda _: 5) 17 ClientService.__init__(self, endpoint, factory, retryPolicy=lambda _: 5)
19 18
20 def startService(self): 19 def startService(self):
21 self.whenConnected().addCallback(self.connectToBroker) 20 self.whenConnected().addCallback(self.connectToBroker)
22 ClientService.startService(self) 21 ClientService.startService(self)
42 41
43 # Publish requests beyond window size are enqueued 42 # Publish requests beyond window size are enqueued
44 self.protocol.setWindowSize(1) 43 self.protocol.setWindowSize(1)
45 44
46 try: 45 try:
47 yield self.protocol.connect("TwistedMQTT-pub", keepalive=60) 46 yield self.protocol.connect(self.clientId, keepalive=60)
48 except Exception as e: 47 except Exception as e:
49 log.error(f"Connecting to {self.endpoint} raised {e!s}") 48 log.error(f"Connecting to {self.endpoint} raised {e!s}")
50 return 49 return
51 50
52 log.info(f"Connected to {self.endpoint}") 51 log.info(f"Connected to {self.endpoint}")
73 return self.protocol.publish(topic=topic.decode('utf-8'), qos=0, 72 return self.protocol.publish(topic=topic.decode('utf-8'), qos=0,
74 message=bytearray(msg)).addErrback(_logFailure) 73 message=bytearray(msg)).addErrback(_logFailure)
75 74
76 75
77 class MqttClient(object): 76 class MqttClient(object):
78 def __init__(self, brokerHost='bang', brokerPort=1883): 77 def __init__(self, clientId, brokerHost='bang', brokerPort=1883):
79 78
80 self.observersByTopic = {} # bytes: Set(observer) 79 self.observersByTopic = {} # bytes: Set(observer)
81 80
82 factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER) 81 factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER)
83 myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort)) 82 myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort))
84 myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port)) 83 myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port))
85 self.serv = MQTTService(myEndpoint, factory, self.observersByTopic) 84 self.serv = MQTTService(myEndpoint, factory, self.observersByTopic,
85 clientId)
86 self.serv.startService() 86 self.serv.startService()
87 87
88 def publish(self, topic: bytes, msg: bytes): 88 def publish(self, topic: bytes, msg: bytes):
89 return self.serv.publish(topic, msg) 89 return self.serv.publish(topic, msg)
90 90