Mercurial > code > home > repos > homeauto
annotate lib/mqtt_client.py @ 1334:73c2b13692b7
rewrite dhcpleases to use dnsmasq's data files, and all the new build stuff
Ignore-this: 2dffc17b676253d4fec234fc096db4d5
darcs-hash:be3903ed924e998edd207ac4d8ae336bc64243ba
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Tue, 23 Apr 2019 02:56:07 -0700 |
parents | 6561367aa60a |
children |
rev | line source |
---|---|
1183
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
1 from mqtt.client.factory import MQTTFactory |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
2 from twisted.application.internet import ClientService, backoffPolicy |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
3 from twisted.internet.defer import inlineCallbacks, Deferred |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
4 from twisted.internet.endpoints import clientFromString |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
5 import logging |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
6 from twisted.internet import reactor |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
7 from rx.concurrency import TwistedScheduler |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
8 from rx import Observable |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
9 |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
10 log = logging.getLogger('mqtt_client') |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
11 |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
12 class MQTTService(ClientService): |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
13 |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
14 def __init__(self, endpoint, factory): |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
15 self.endpoint = endpoint |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
16 ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy()) |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
17 |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
18 def startService(self): |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
19 self.whenConnected().addCallback(self.connectToBroker) |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
20 ClientService.startService(self) |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
21 |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
22 @inlineCallbacks |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
23 def connectToBroker(self, protocol): |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
24 self.protocol = protocol |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
25 self.protocol.onDisconnection = self.onDisconnection |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
26 # We are issuing 3 publish in a row |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
27 # if order matters, then set window size to 1 |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
28 # Publish requests beyond window size are enqueued |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
29 self.protocol.setWindowSize(1) |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
30 |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
31 try: |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
32 yield self.protocol.connect("TwistedMQTT-pub", keepalive=60) |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
33 except Exception as e: |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
34 log.error("Connecting to {broker} raised {excp!s}", |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
35 broker=self.endpoint, excp=e) |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
36 else: |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
37 log.info("Connected to {broker}".format(broker=self.endpoint)) |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
38 if getattr(self, 'onMqttConnectionMade', False): |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
39 self.onMqttConnectionMade() |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
40 |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
41 def onDisconnection(self, reason): |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
42 log.warn("Connection to broker lost: %r", reason) |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
43 self.whenConnected().addCallback(self.connectToBroker) |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
44 |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
45 def publish(self, topic, msg): |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
46 def _logFailure(failure): |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
47 log.warn("publish failed: %s", failure.getErrorMessage()) |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
48 return failure |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
49 |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
50 return self.protocol.publish(topic=topic, qos=0, message=msg).addErrback(_logFailure) |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
51 |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
52 |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
53 class MqttClient(object): |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
54 def __init__(self, brokerHost='bang', brokerPort=1883): |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
55 |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
56 #scheduler = TwistedScheduler(reactor) |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
57 |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
58 factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER) |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
59 myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort)) |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
60 myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port)) |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
61 self.serv = MQTTService(myEndpoint, factory) |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
62 self.serv.startService() |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
63 |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
64 def publish(self, topic, msg): |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
65 return self.serv.publish(topic, msg) |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
66 |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
67 def subscribe(self, topic): |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
68 """returns rx.Observable of payload strings""" |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
69 # This is surely broken for multiple topics and subscriptions. Might not even |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
70 # work over a reconnect. |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
71 |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
72 ret = Observable.create(self._observe_msgs) |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
73 |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
74 self.serv.onMqttConnectionMade = lambda: self._resubscribe(topic) |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
75 if (hasattr(self.serv, 'protocol') and |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
76 self.serv.protocol.state ==self.serv.protocol.CONNECTED): |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
77 self._resubscribe(topic) |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
78 return ret |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
79 |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
80 def _resubscribe(self, topic): |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
81 log.info('subscribing %r', topic) |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
82 self.serv.protocol.onPublish = self._onPublish |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
83 return self.serv.protocol.subscribe(topic, 2) |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
84 |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
85 def _observe_msgs(self, observer): |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
86 self.obs = observer |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
87 |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
88 def _onPublish(self, topic, payload, qos, dup, retain, msgId): |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
89 log.debug('received payload %r', payload) |
6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
90 self.obs.on_next(payload) |