Mercurial > code > home > repos > homeauto
annotate lib/mqtt_client/mqtt_client.py @ 1385:c887b1cc5e83
caller must set clientId
Ignore-this: b6c0f4f20697fa177d42c568269b1c8f
darcs-hash:b92509836ada530f47ec0e7d9148ee166bffe23b
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Fri, 10 May 2019 01:31:21 -0700 |
parents | f883166f7ca1 |
children | 8d165cd29a5b |
rev | line source |
---|---|
1357 | 1 import logging |
2 from mqtt.client.factory import MQTTFactory | |
1382
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
3 from rx.subjects import Subject |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
4 from twisted.application.internet import ClientService |
1357 | 5 from twisted.internet import reactor |
1385 | 6 from twisted.internet.defer import inlineCallbacks |
1357 | 7 from twisted.internet.endpoints import clientFromString |
8 | |
9 log = logging.getLogger('mqtt_client') | |
10 | |
11 class MQTTService(ClientService): | |
12 | |
1385 | 13 def __init__(self, endpoint, factory, observersByTopic, clientId): |
1357 | 14 self.endpoint = endpoint |
1382
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
15 self.observersByTopic = observersByTopic |
1385 | 16 self.clientId = clientId |
1382
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
17 ClientService.__init__(self, endpoint, factory, retryPolicy=lambda _: 5) |
1357 | 18 |
19 def startService(self): | |
20 self.whenConnected().addCallback(self.connectToBroker) | |
21 ClientService.startService(self) | |
22 | |
1382
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
23 def ensureSubscribed(self, topic: bytes): |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
24 self.whenConnected().addCallback(self._subscribeToLatestTopic, topic) |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
25 |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
26 def _subscribeToLatestTopic(self, protocol, topic: bytes): |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
27 if protocol.state == protocol.CONNECTED: |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
28 self.protocol.subscribe(topics=[(topic.decode('utf8'), 2)]) |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
29 # else it'll get done in the next connectToBroker. |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
30 |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
31 def _subscribeAll(self): |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
32 topics = list(self.observersByTopic) |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
33 log.info('subscribing %r', topics) |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
34 self.protocol.subscribe(topics=[(topic.decode('utf8'), 2) for topic in topics]) |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
35 |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
36 |
1357 | 37 @inlineCallbacks |
38 def connectToBroker(self, protocol): | |
39 self.protocol = protocol | |
1382
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
40 self.protocol.onDisconnection = self._onProtocolDisconnection |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
41 |
1357 | 42 # Publish requests beyond window size are enqueued |
43 self.protocol.setWindowSize(1) | |
44 | |
45 try: | |
1385 | 46 yield self.protocol.connect(self.clientId, keepalive=60) |
1357 | 47 except Exception as e: |
1382
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
48 log.error(f"Connecting to {self.endpoint} raised {e!s}") |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
49 return |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
50 |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
51 log.info(f"Connected to {self.endpoint}") |
1357 | 52 |
1382
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
53 self.protocol.onPublish = self._onProtocolMessage |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
54 self._subscribeAll() |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
55 |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
56 def _onProtocolMessage(self, topic, payload, qos, dup, retain, msgId): |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
57 topic = topic.encode('ascii') |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
58 observers = self.observersByTopic.get(topic, []) |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
59 log.debug(f'received {topic} payload {payload} ({len(observers)} obs)') |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
60 for obs in observers: |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
61 obs.on_next(payload) |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
62 |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
63 def _onProtocolDisconnection(self, reason): |
1357 | 64 log.warn("Connection to broker lost: %r", reason) |
65 self.whenConnected().addCallback(self.connectToBroker) | |
66 | |
1359 | 67 def publish(self, topic: bytes, msg: bytes): |
1357 | 68 def _logFailure(failure): |
69 log.warn("publish failed: %s", failure.getErrorMessage()) | |
70 return failure | |
71 | |
1363
3cf19717cb6f
also pass topic as str at publish
drewp <drewp@bigasterisk.com>
parents:
1361
diff
changeset
|
72 return self.protocol.publish(topic=topic.decode('utf-8'), qos=0, |
1359 | 73 message=bytearray(msg)).addErrback(_logFailure) |
1357 | 74 |
75 | |
76 class MqttClient(object): | |
1385 | 77 def __init__(self, clientId, brokerHost='bang', brokerPort=1883): |
1357 | 78 |
1382
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
79 self.observersByTopic = {} # bytes: Set(observer) |
1357 | 80 |
81 factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER) | |
82 myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort)) | |
83 myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port)) | |
1385 | 84 self.serv = MQTTService(myEndpoint, factory, self.observersByTopic, |
85 clientId) | |
1357 | 86 self.serv.startService() |
1382
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
87 |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
88 def publish(self, topic: bytes, msg: bytes): |
1357 | 89 return self.serv.publish(topic, msg) |
90 | |
1359 | 91 def subscribe(self, topic: bytes): |
1357 | 92 """returns rx.Observable of payload strings""" |
1382
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
93 ret = Subject() |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
94 self.observersByTopic.setdefault(topic, set()).add(ret) |
f883166f7ca1
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents:
1363
diff
changeset
|
95 self.serv.ensureSubscribed(topic) |
1357 | 96 return ret |