annotate lib/mqtt_client/mqtt_client.py @ 580:9d60d3f34ddc

release 0.5.0 Ignore-this: 1ccfaf82e1b7a9d0fe4eda652e27a3dd
author drewp@bigasterisk.com
date Wed, 08 May 2019 00:56:54 -0700
parents 603272ee3000
children 6b6a7d06691e
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
554
2c4e8ef57f08 mqtt_client into a distributable
drewp@bigasterisk.com
parents: 378
diff changeset
1 import logging
378
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
2 from mqtt.client.factory import MQTTFactory
554
2c4e8ef57f08 mqtt_client into a distributable
drewp@bigasterisk.com
parents: 378
diff changeset
3 from rx import Observable
579
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
4 from rx.subjects import Subject
554
2c4e8ef57f08 mqtt_client into a distributable
drewp@bigasterisk.com
parents: 378
diff changeset
5 from rx.concurrency import TwistedScheduler
579
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
6 from twisted.application.internet import ClientService
554
2c4e8ef57f08 mqtt_client into a distributable
drewp@bigasterisk.com
parents: 378
diff changeset
7 from twisted.internet import reactor
378
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
8 from twisted.internet.defer import inlineCallbacks, Deferred
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
9 from twisted.internet.endpoints import clientFromString
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
10
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
11 log = logging.getLogger('mqtt_client')
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
12
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
13 class MQTTService(ClientService):
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
14
579
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
15 def __init__(self, endpoint, factory, observersByTopic):
378
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
16 self.endpoint = endpoint
579
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
17 self.observersByTopic = observersByTopic
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
18 ClientService.__init__(self, endpoint, factory, retryPolicy=lambda _: 5)
378
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
19
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
20 def startService(self):
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
21 self.whenConnected().addCallback(self.connectToBroker)
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
22 ClientService.startService(self)
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
23
579
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
24 def ensureSubscribed(self, topic: bytes):
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
25 self.whenConnected().addCallback(self._subscribeToLatestTopic, topic)
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
26
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
27 def _subscribeToLatestTopic(self, protocol, topic: bytes):
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
28 if protocol.state == protocol.CONNECTED:
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
29 self.protocol.subscribe(topics=[(topic.decode('utf8'), 2)])
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
30 # else it'll get done in the next connectToBroker.
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
31
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
32 def _subscribeAll(self):
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
33 topics = list(self.observersByTopic)
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
34 log.info('subscribing %r', topics)
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
35 self.protocol.subscribe(topics=[(topic.decode('utf8'), 2) for topic in topics])
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
36
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
37
378
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
38 @inlineCallbacks
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
39 def connectToBroker(self, protocol):
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
40 self.protocol = protocol
579
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
41 self.protocol.onDisconnection = self._onProtocolDisconnection
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
42
378
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
43 # Publish requests beyond window size are enqueued
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
44 self.protocol.setWindowSize(1)
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
45
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
46 try:
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
47 yield self.protocol.connect("TwistedMQTT-pub", keepalive=60)
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
48 except Exception as e:
579
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
49 log.error(f"Connecting to {self.endpoint} raised {e!s}")
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
50 return
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
51
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
52 log.info(f"Connected to {self.endpoint}")
378
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
53
579
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
54 self.protocol.onPublish = self._onProtocolMessage
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
55 self._subscribeAll()
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
56
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
57 def _onProtocolMessage(self, topic, payload, qos, dup, retain, msgId):
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
58 topic = topic.encode('ascii')
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
59 observers = self.observersByTopic.get(topic, [])
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
60 log.debug(f'received {topic} payload {payload} ({len(observers)} obs)')
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
61 for obs in observers:
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
62 obs.on_next(payload)
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
63
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
64 def _onProtocolDisconnection(self, reason):
378
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
65 log.warn("Connection to broker lost: %r", reason)
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
66 self.whenConnected().addCallback(self.connectToBroker)
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
67
556
f263f76106aa some py3 compatibility
drewp@bigasterisk.com
parents: 554
diff changeset
68 def publish(self, topic: bytes, msg: bytes):
378
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
69 def _logFailure(failure):
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
70 log.warn("publish failed: %s", failure.getErrorMessage())
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
71 return failure
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
72
560
3501410e4cc7 also pass topic as str at publish
drewp@bigasterisk.com
parents: 558
diff changeset
73 return self.protocol.publish(topic=topic.decode('utf-8'), qos=0,
556
f263f76106aa some py3 compatibility
drewp@bigasterisk.com
parents: 554
diff changeset
74 message=bytearray(msg)).addErrback(_logFailure)
378
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
75
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
76
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
77 class MqttClient(object):
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
78 def __init__(self, brokerHost='bang', brokerPort=1883):
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
79
579
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
80 self.observersByTopic = {} # bytes: Set(observer)
378
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
81
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
82 factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER)
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
83 myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort))
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
84 myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port))
579
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
85 self.serv = MQTTService(myEndpoint, factory, self.observersByTopic)
378
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
86 self.serv.startService()
579
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
87
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
88 def publish(self, topic: bytes, msg: bytes):
378
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
89 return self.serv.publish(topic, msg)
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
90
556
f263f76106aa some py3 compatibility
drewp@bigasterisk.com
parents: 554
diff changeset
91 def subscribe(self, topic: bytes):
378
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
92 """returns rx.Observable of payload strings"""
579
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
93 ret = Subject()
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
94 self.observersByTopic.setdefault(topic, set()).add(ret)
603272ee3000 big rewrite. now probably works for multiple subscriptions and over reconnects
drewp@bigasterisk.com
parents: 560
diff changeset
95 self.serv.ensureSubscribed(topic)
378
b90d9321d2ce factor common mqtt code out of mqtt_graph_bridge
drewp@bigasterisk.com
parents:
diff changeset
96 return ret