Mercurial > code > home > repos > homeauto
comparison lib/mqtt_client/mqtt_client.py @ 1385:c887b1cc5e83
caller must set clientId
Ignore-this: b6c0f4f20697fa177d42c568269b1c8f
darcs-hash:b92509836ada530f47ec0e7d9148ee166bffe23b
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Fri, 10 May 2019 01:31:21 -0700 |
parents | f883166f7ca1 |
children | 8d165cd29a5b |
comparison
equal
deleted
inserted
replaced
1384:a29a55f3429c | 1385:c887b1cc5e83 |
---|---|
1 import logging | 1 import logging |
2 from mqtt.client.factory import MQTTFactory | 2 from mqtt.client.factory import MQTTFactory |
3 from rx import Observable | |
4 from rx.subjects import Subject | 3 from rx.subjects import Subject |
5 from rx.concurrency import TwistedScheduler | |
6 from twisted.application.internet import ClientService | 4 from twisted.application.internet import ClientService |
7 from twisted.internet import reactor | 5 from twisted.internet import reactor |
8 from twisted.internet.defer import inlineCallbacks, Deferred | 6 from twisted.internet.defer import inlineCallbacks |
9 from twisted.internet.endpoints import clientFromString | 7 from twisted.internet.endpoints import clientFromString |
10 | 8 |
11 log = logging.getLogger('mqtt_client') | 9 log = logging.getLogger('mqtt_client') |
12 | 10 |
13 class MQTTService(ClientService): | 11 class MQTTService(ClientService): |
14 | 12 |
15 def __init__(self, endpoint, factory, observersByTopic): | 13 def __init__(self, endpoint, factory, observersByTopic, clientId): |
16 self.endpoint = endpoint | 14 self.endpoint = endpoint |
17 self.observersByTopic = observersByTopic | 15 self.observersByTopic = observersByTopic |
16 self.clientId = clientId | |
18 ClientService.__init__(self, endpoint, factory, retryPolicy=lambda _: 5) | 17 ClientService.__init__(self, endpoint, factory, retryPolicy=lambda _: 5) |
19 | 18 |
20 def startService(self): | 19 def startService(self): |
21 self.whenConnected().addCallback(self.connectToBroker) | 20 self.whenConnected().addCallback(self.connectToBroker) |
22 ClientService.startService(self) | 21 ClientService.startService(self) |
42 | 41 |
43 # Publish requests beyond window size are enqueued | 42 # Publish requests beyond window size are enqueued |
44 self.protocol.setWindowSize(1) | 43 self.protocol.setWindowSize(1) |
45 | 44 |
46 try: | 45 try: |
47 yield self.protocol.connect("TwistedMQTT-pub", keepalive=60) | 46 yield self.protocol.connect(self.clientId, keepalive=60) |
48 except Exception as e: | 47 except Exception as e: |
49 log.error(f"Connecting to {self.endpoint} raised {e!s}") | 48 log.error(f"Connecting to {self.endpoint} raised {e!s}") |
50 return | 49 return |
51 | 50 |
52 log.info(f"Connected to {self.endpoint}") | 51 log.info(f"Connected to {self.endpoint}") |
73 return self.protocol.publish(topic=topic.decode('utf-8'), qos=0, | 72 return self.protocol.publish(topic=topic.decode('utf-8'), qos=0, |
74 message=bytearray(msg)).addErrback(_logFailure) | 73 message=bytearray(msg)).addErrback(_logFailure) |
75 | 74 |
76 | 75 |
77 class MqttClient(object): | 76 class MqttClient(object): |
78 def __init__(self, brokerHost='bang', brokerPort=1883): | 77 def __init__(self, clientId, brokerHost='bang', brokerPort=1883): |
79 | 78 |
80 self.observersByTopic = {} # bytes: Set(observer) | 79 self.observersByTopic = {} # bytes: Set(observer) |
81 | 80 |
82 factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER) | 81 factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER) |
83 myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort)) | 82 myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort)) |
84 myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port)) | 83 myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port)) |
85 self.serv = MQTTService(myEndpoint, factory, self.observersByTopic) | 84 self.serv = MQTTService(myEndpoint, factory, self.observersByTopic, |
85 clientId) | |
86 self.serv.startService() | 86 self.serv.startService() |
87 | 87 |
88 def publish(self, topic: bytes, msg: bytes): | 88 def publish(self, topic: bytes, msg: bytes): |
89 return self.serv.publish(topic, msg) | 89 return self.serv.publish(topic, msg) |
90 | 90 |