Mercurial > code > home > repos > homeauto
annotate lib/mqtt_client/mqtt_client.py @ 1655:15e195fb24bb
new stmt_chunk_test
author | drewp@bigasterisk.com |
---|---|
date | Sun, 19 Sep 2021 13:21:33 -0700 |
parents | bad87b7dc608 |
children |
rev | line source |
---|---|
554 | 1 import logging |
378
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
2 from mqtt.client.factory import MQTTFactory |
645 | 3 import rx.subject |
579
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
4 from twisted.application.internet import ClientService |
554 | 5 from twisted.internet import reactor |
582 | 6 from twisted.internet.defer import inlineCallbacks |
378
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
7 from twisted.internet.endpoints import clientFromString |
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
8 |
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
9 log = logging.getLogger('mqtt_client') |
779 | 10 AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE = 0, 1, 2 |
378
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
11 |
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
12 class MQTTService(ClientService): |
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
13 |
582 | 14 def __init__(self, endpoint, factory, observersByTopic, clientId): |
378
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
15 self.endpoint = endpoint |
579
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
16 self.observersByTopic = observersByTopic |
582 | 17 self.clientId = clientId |
579
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
18 ClientService.__init__(self, endpoint, factory, retryPolicy=lambda _: 5) |
378
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
19 |
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
20 def startService(self): |
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
21 self.whenConnected().addCallback(self.connectToBroker) |
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
22 ClientService.startService(self) |
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
23 |
579
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
24 def ensureSubscribed(self, topic: bytes): |
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
25 self.whenConnected().addCallback(self._subscribeToLatestTopic, topic) |
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
26 |
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
27 def _subscribeToLatestTopic(self, protocol, topic: bytes): |
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
28 if protocol.state == protocol.CONNECTED: |
779 | 29 self.protocol.subscribe(topics=[(topic.decode('utf8'), AT_LEAST_ONCE)]) |
579
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
30 # else it'll get done in the next connectToBroker. |
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
31 |
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
32 def _subscribeAll(self): |
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
33 topics = list(self.observersByTopic) |
584
8fcd51b1cb00
don't send empty subscribe request at startup- broker will hang up on us
drewp@bigasterisk.com
parents:
582
diff
changeset
|
34 if not topics: |
8fcd51b1cb00
don't send empty subscribe request at startup- broker will hang up on us
drewp@bigasterisk.com
parents:
582
diff
changeset
|
35 return |
579
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
36 log.info('subscribing %r', topics) |
779 | 37 self.protocol.subscribe(topics=[(topic.decode('utf8'), AT_LEAST_ONCE) for topic in topics]) |
579
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
38 |
645 | 39 |
378
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
40 @inlineCallbacks |
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
41 def connectToBroker(self, protocol): |
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
42 self.protocol = protocol |
579
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
43 self.protocol.onDisconnection = self._onProtocolDisconnection |
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
44 |
378
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
45 # Publish requests beyond window size are enqueued |
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
46 self.protocol.setWindowSize(1) |
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
47 |
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
48 try: |
582 | 49 yield self.protocol.connect(self.clientId, keepalive=60) |
378
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
50 except Exception as e: |
579
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
51 log.error(f"Connecting to {self.endpoint} raised {e!s}") |
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
52 return |
645 | 53 |
579
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
54 log.info(f"Connected to {self.endpoint}") |
378
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
55 |
579
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
56 self.protocol.onPublish = self._onProtocolMessage |
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
57 self._subscribeAll() |
645 | 58 |
579
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
59 def _onProtocolMessage(self, topic, payload, qos, dup, retain, msgId): |
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
60 topic = topic.encode('ascii') |
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
61 observers = self.observersByTopic.get(topic, []) |
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
62 log.debug(f'received {topic} payload {payload} ({len(observers)} obs)') |
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
63 for obs in observers: |
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
64 obs.on_next(payload) |
645 | 65 |
579
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
66 def _onProtocolDisconnection(self, reason): |
378
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
67 log.warn("Connection to broker lost: %r", reason) |
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
68 self.whenConnected().addCallback(self.connectToBroker) |
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
69 |
556 | 70 def publish(self, topic: bytes, msg: bytes): |
378
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
71 def _logFailure(failure): |
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
72 log.warn("publish failed: %s", failure.getErrorMessage()) |
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
73 return failure |
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
74 |
560 | 75 return self.protocol.publish(topic=topic.decode('utf-8'), qos=0, |
556 | 76 message=bytearray(msg)).addErrback(_logFailure) |
378
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
77 |
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
78 |
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
79 class MqttClient(object): |
582 | 80 def __init__(self, clientId, brokerHost='bang', brokerPort=1883): |
378
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
81 |
579
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
82 self.observersByTopic = {} # bytes: Set(observer) |
645 | 83 |
378
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
84 factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER) |
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
85 myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort)) |
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
86 myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port)) |
582 | 87 self.serv = MQTTService(myEndpoint, factory, self.observersByTopic, |
88 clientId) | |
378
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
89 self.serv.startService() |
579
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
90 |
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
91 def publish(self, topic: bytes, msg: bytes): |
378
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
92 return self.serv.publish(topic, msg) |
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
93 |
556 | 94 def subscribe(self, topic: bytes): |
378
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
95 """returns rx.Observable of payload strings""" |
645 | 96 ret = rx.subject.Subject() |
579
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
97 self.observersByTopic.setdefault(topic, set()).add(ret) |
603272ee3000
big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents:
560
diff
changeset
|
98 self.serv.ensureSubscribed(topic) |
378
b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff
changeset
|
99 return ret |