annotate lib/mqtt_client/mqtt_client.py @ 1446:ef03d25cc815

fix import for rx 3.x Ignore-this: a8944e85d51997a853c90ed1893eb1f6 darcs-hash:216b8893da0be687a7150c14030052bdd67625cd
author drewp <drewp@bigasterisk.com>
date Wed, 25 Sep 2019 16:08:43 -0700
parents 8d165cd29a5b
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
1357
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
1 import logging
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
2 from mqtt.client.factory import MQTTFactory
1446
ef03d25cc815 fix import for rx 3.x
drewp <drewp@bigasterisk.com>
parents: 1387
diff changeset
3 import rx.subject
1382
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
4 from twisted.application.internet import ClientService
1357
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
5 from twisted.internet import reactor
1385
c887b1cc5e83 caller must set clientId
drewp <drewp@bigasterisk.com>
parents: 1382
diff changeset
6 from twisted.internet.defer import inlineCallbacks
1357
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
7 from twisted.internet.endpoints import clientFromString
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
8
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
9 log = logging.getLogger('mqtt_client')
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
10
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
11 class MQTTService(ClientService):
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
12
1385
c887b1cc5e83 caller must set clientId
drewp <drewp@bigasterisk.com>
parents: 1382
diff changeset
13 def __init__(self, endpoint, factory, observersByTopic, clientId):
1357
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
14 self.endpoint = endpoint
1382
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
15 self.observersByTopic = observersByTopic
1385
c887b1cc5e83 caller must set clientId
drewp <drewp@bigasterisk.com>
parents: 1382
diff changeset
16 self.clientId = clientId
1382
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
17 ClientService.__init__(self, endpoint, factory, retryPolicy=lambda _: 5)
1357
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
18
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
19 def startService(self):
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
20 self.whenConnected().addCallback(self.connectToBroker)
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
21 ClientService.startService(self)
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
22
1382
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
23 def ensureSubscribed(self, topic: bytes):
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
24 self.whenConnected().addCallback(self._subscribeToLatestTopic, topic)
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
25
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
26 def _subscribeToLatestTopic(self, protocol, topic: bytes):
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
27 if protocol.state == protocol.CONNECTED:
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
28 self.protocol.subscribe(topics=[(topic.decode('utf8'), 2)])
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
29 # else it'll get done in the next connectToBroker.
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
30
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
31 def _subscribeAll(self):
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
32 topics = list(self.observersByTopic)
1387
8d165cd29a5b don't send empty subscribe request at startup- broker will hang up on us
drewp <drewp@bigasterisk.com>
parents: 1385
diff changeset
33 if not topics:
8d165cd29a5b don't send empty subscribe request at startup- broker will hang up on us
drewp <drewp@bigasterisk.com>
parents: 1385
diff changeset
34 return
1382
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
35 log.info('subscribing %r', topics)
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
36 self.protocol.subscribe(topics=[(topic.decode('utf8'), 2) for topic in topics])
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
37
1446
ef03d25cc815 fix import for rx 3.x
drewp <drewp@bigasterisk.com>
parents: 1387
diff changeset
38
1357
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
39 @inlineCallbacks
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
40 def connectToBroker(self, protocol):
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
41 self.protocol = protocol
1382
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
42 self.protocol.onDisconnection = self._onProtocolDisconnection
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
43
1357
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
44 # Publish requests beyond window size are enqueued
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
45 self.protocol.setWindowSize(1)
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
46
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
47 try:
1385
c887b1cc5e83 caller must set clientId
drewp <drewp@bigasterisk.com>
parents: 1382
diff changeset
48 yield self.protocol.connect(self.clientId, keepalive=60)
1357
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
49 except Exception as e:
1382
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
50 log.error(f"Connecting to {self.endpoint} raised {e!s}")
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
51 return
1446
ef03d25cc815 fix import for rx 3.x
drewp <drewp@bigasterisk.com>
parents: 1387
diff changeset
52
1382
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
53 log.info(f"Connected to {self.endpoint}")
1357
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
54
1382
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
55 self.protocol.onPublish = self._onProtocolMessage
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
56 self._subscribeAll()
1446
ef03d25cc815 fix import for rx 3.x
drewp <drewp@bigasterisk.com>
parents: 1387
diff changeset
57
1382
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
58 def _onProtocolMessage(self, topic, payload, qos, dup, retain, msgId):
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
59 topic = topic.encode('ascii')
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
60 observers = self.observersByTopic.get(topic, [])
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
61 log.debug(f'received {topic} payload {payload} ({len(observers)} obs)')
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
62 for obs in observers:
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
63 obs.on_next(payload)
1446
ef03d25cc815 fix import for rx 3.x
drewp <drewp@bigasterisk.com>
parents: 1387
diff changeset
64
1382
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
65 def _onProtocolDisconnection(self, reason):
1357
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
66 log.warn("Connection to broker lost: %r", reason)
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
67 self.whenConnected().addCallback(self.connectToBroker)
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
68
1359
8fb419235dc9 some py3 compatibility
drewp <drewp@bigasterisk.com>
parents: 1357
diff changeset
69 def publish(self, topic: bytes, msg: bytes):
1357
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
70 def _logFailure(failure):
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
71 log.warn("publish failed: %s", failure.getErrorMessage())
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
72 return failure
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
73
1363
3cf19717cb6f also pass topic as str at publish
drewp <drewp@bigasterisk.com>
parents: 1361
diff changeset
74 return self.protocol.publish(topic=topic.decode('utf-8'), qos=0,
1359
8fb419235dc9 some py3 compatibility
drewp <drewp@bigasterisk.com>
parents: 1357
diff changeset
75 message=bytearray(msg)).addErrback(_logFailure)
1357
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
76
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
77
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
78 class MqttClient(object):
1385
c887b1cc5e83 caller must set clientId
drewp <drewp@bigasterisk.com>
parents: 1382
diff changeset
79 def __init__(self, clientId, brokerHost='bang', brokerPort=1883):
1357
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
80
1382
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
81 self.observersByTopic = {} # bytes: Set(observer)
1446
ef03d25cc815 fix import for rx 3.x
drewp <drewp@bigasterisk.com>
parents: 1387
diff changeset
82
1357
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
83 factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER)
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
84 myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort))
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
85 myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port))
1385
c887b1cc5e83 caller must set clientId
drewp <drewp@bigasterisk.com>
parents: 1382
diff changeset
86 self.serv = MQTTService(myEndpoint, factory, self.observersByTopic,
c887b1cc5e83 caller must set clientId
drewp <drewp@bigasterisk.com>
parents: 1382
diff changeset
87 clientId)
1357
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
88 self.serv.startService()
1382
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
89
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
90 def publish(self, topic: bytes, msg: bytes):
1357
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
91 return self.serv.publish(topic, msg)
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
92
1359
8fb419235dc9 some py3 compatibility
drewp <drewp@bigasterisk.com>
parents: 1357
diff changeset
93 def subscribe(self, topic: bytes):
1357
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
94 """returns rx.Observable of payload strings"""
1446
ef03d25cc815 fix import for rx 3.x
drewp <drewp@bigasterisk.com>
parents: 1387
diff changeset
95 ret = rx.subject.Subject()
1382
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
96 self.observersByTopic.setdefault(topic, set()).add(ret)
f883166f7ca1 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp <drewp@bigasterisk.com>
parents: 1363
diff changeset
97 self.serv.ensureSubscribed(topic)
1357
f99fe03803d4 mqtt_client into a distributable
drewp <drewp@bigasterisk.com>
parents:
diff changeset
98 return ret