annotate mqtt_client.py @ 0:834594523aa4

move from homeauto repo
author drewp@bigasterisk.com
date Wed, 24 Nov 2021 10:06:04 -0800
parents
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
0
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
1 import logging
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
2 from mqtt.client.factory import MQTTFactory
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
3 import rx.subject
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
4 from twisted.application.internet import ClientService
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
5 from twisted.internet import reactor
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
6 from twisted.internet.defer import inlineCallbacks
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
7 from twisted.internet.endpoints import clientFromString
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
8
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
9 log = logging.getLogger('mqtt_client')
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
10 AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE = 0, 1, 2
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
11
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
12 class MQTTService(ClientService):
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
13
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
14 def __init__(self, endpoint, factory, observersByTopic, clientId):
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
15 self.endpoint = endpoint
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
16 self.observersByTopic = observersByTopic
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
17 self.clientId = clientId
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
18 ClientService.__init__(self, endpoint, factory, retryPolicy=lambda _: 5)
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
19
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
20 def startService(self):
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
21 self.whenConnected().addCallback(self.connectToBroker)
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
22 ClientService.startService(self)
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
23
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
24 def ensureSubscribed(self, topic: bytes):
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
25 self.whenConnected().addCallback(self._subscribeToLatestTopic, topic)
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
26
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
27 def _subscribeToLatestTopic(self, protocol, topic: bytes):
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
28 if protocol.state == protocol.CONNECTED:
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
29 self.protocol.subscribe(topics=[(topic.decode('utf8'), AT_LEAST_ONCE)])
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
30 # else it'll get done in the next connectToBroker.
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
31
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
32 def _subscribeAll(self):
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
33 topics = list(self.observersByTopic)
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
34 if not topics:
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
35 return
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
36 log.info('subscribing %r', topics)
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
37 self.protocol.subscribe(topics=[(topic.decode('utf8'), AT_LEAST_ONCE) for topic in topics])
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
38
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
39
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
40 @inlineCallbacks
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
41 def connectToBroker(self, protocol):
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
42 self.protocol = protocol
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
43 self.protocol.onDisconnection = self._onProtocolDisconnection
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
44
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
45 # Publish requests beyond window size are enqueued
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
46 self.protocol.setWindowSize(1)
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
47
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
48 try:
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
49 yield self.protocol.connect(self.clientId, keepalive=60)
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
50 except Exception as e:
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
51 log.error(f"Connecting to {self.endpoint} raised {e!s}")
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
52 return
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
53
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
54 log.info(f"Connected to {self.endpoint}")
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
55
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
56 self.protocol.onPublish = self._onProtocolMessage
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
57 self._subscribeAll()
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
58
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
59 def _onProtocolMessage(self, topic, payload, qos, dup, retain, msgId):
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
60 topic = topic.encode('ascii')
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
61 observers = self.observersByTopic.get(topic, [])
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
62 log.debug(f'received {topic} payload {payload} ({len(observers)} obs)')
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
63 for obs in observers:
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
64 obs.on_next(payload)
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
65
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
66 def _onProtocolDisconnection(self, reason):
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
67 log.warn("Connection to broker lost: %r", reason)
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
68 self.whenConnected().addCallback(self.connectToBroker)
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
69
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
70 def publish(self, topic: bytes, msg: bytes):
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
71 def _logFailure(failure):
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
72 log.warn("publish failed: %s", failure.getErrorMessage())
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
73 return failure
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
74
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
75 return self.protocol.publish(topic=topic.decode('utf-8'), qos=0,
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
76 message=bytearray(msg)).addErrback(_logFailure)
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
77
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
78
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
79 class MqttClient(object):
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
80 def __init__(self, clientId, brokerHost='bang', brokerPort=1883):
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
81
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
82 self.observersByTopic = {} # bytes: Set(observer)
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
83
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
84 factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER)
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
85 myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort))
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
86 myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port))
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
87 self.serv = MQTTService(myEndpoint, factory, self.observersByTopic,
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
88 clientId)
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
89 self.serv.startService()
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
90
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
91 def publish(self, topic: bytes, msg: bytes):
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
92 return self.serv.publish(topic, msg)
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
93
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
94 def subscribe(self, topic: bytes):
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
95 """returns rx.Observable of payload strings"""
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
96 ret = rx.subject.Subject()
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
97 self.observersByTopic.setdefault(topic, set()).add(ret)
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
98 self.serv.ensureSubscribed(topic)
834594523aa4 move from homeauto repo
drewp@bigasterisk.com
parents:
diff changeset
99 return ret