changeset 378:b90d9321d2ce

factor common mqtt code out of mqtt_graph_bridge Ignore-this: 21b54376d0b00f6709f1947c198cb4a8
author drewp@bigasterisk.com
date Wed, 12 Dec 2018 01:10:48 -0800
parents 5b690bfc31b2
children 67cebf7a14de
files lib/mqtt_client.py service/mqtt_graph_bridge/mqtt_graph_bridge.py service/mqtt_graph_bridge/requirements.txt
diffstat 3 files changed, 98 insertions(+), 49 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/mqtt_client.py	Wed Dec 12 01:10:48 2018 -0800
@@ -0,0 +1,90 @@
+from mqtt.client.factory import MQTTFactory
+from twisted.application.internet import ClientService, backoffPolicy
+from twisted.internet.defer import inlineCallbacks, Deferred
+from twisted.internet.endpoints import clientFromString
+import logging
+from twisted.internet import reactor
+from rx.concurrency import TwistedScheduler
+from rx import Observable
+
+log = logging.getLogger('mqtt_client')
+
+class MQTTService(ClientService):
+
+    def __init__(self, endpoint, factory):
+        self.endpoint = endpoint
+        ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy())
+
+    def startService(self):
+        self.whenConnected().addCallback(self.connectToBroker)
+        ClientService.startService(self)
+
+    @inlineCallbacks
+    def connectToBroker(self, protocol):
+        self.protocol = protocol
+        self.protocol.onDisconnection = self.onDisconnection
+        # We are issuing 3 publish in a row
+        # if order matters, then set window size to 1
+        # Publish requests beyond window size are enqueued
+        self.protocol.setWindowSize(1)
+
+        try:
+            yield self.protocol.connect("TwistedMQTT-pub", keepalive=60)
+        except Exception as e:
+            log.error("Connecting to {broker} raised {excp!s}",
+                      broker=self.endpoint, excp=e)
+        else:
+            log.info("Connected to {broker}".format(broker=self.endpoint))
+        if getattr(self, 'onMqttConnectionMade', False):
+            self.onMqttConnectionMade()
+
+    def onDisconnection(self, reason):
+        log.warn("Connection to broker lost: %r", reason)
+        self.whenConnected().addCallback(self.connectToBroker)
+
+    def publish(self, topic, msg):
+        def _logFailure(failure):
+            log.warn("publish failed: %s", failure.getErrorMessage())
+            return failure
+
+        return self.protocol.publish(topic=topic, qos=0, message=msg).addErrback(_logFailure)
+
+
+class MqttClient(object):
+    def __init__(self, brokerHost='bang', brokerPort=1883):
+
+        #scheduler = TwistedScheduler(reactor)
+        
+        factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER)
+        myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort))
+        myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port))
+        self.serv = MQTTService(myEndpoint, factory)
+        self.serv.startService()
+        
+    def publish(self, topic, msg):
+        return self.serv.publish(topic, msg)
+
+    def subscribe(self, topic):
+        """returns rx.Observable of payload strings"""
+        # This is surely broken for multiple topics and subscriptions. Might not even
+        # work over a reconnect.
+        
+        ret = Observable.create(self._observe_msgs)
+
+        self.serv.onMqttConnectionMade = lambda: self._resubscribe(topic)
+        if (hasattr(self.serv, 'protocol') and
+            self.serv.protocol.state ==self.serv.protocol.CONNECTED):
+            self._resubscribe(topic)
+        return ret
+
+    def _resubscribe(self, topic):
+        log.info('subscribing %r', topic)
+        self.serv.protocol.onPublish = self._onPublish
+        return self.serv.protocol.subscribe(topic, 2)
+        
+    def _observe_msgs(self, observer):
+        self.obs = observer
+
+    def _onPublish(self, topic, payload, qos, dup, retain, msgId):
+        log.debug('received payload %r', payload)
+        self.obs.on_next(payload)
--- a/service/mqtt_graph_bridge/mqtt_graph_bridge.py	Tue Dec 11 19:13:06 2018 -0800
+++ b/service/mqtt_graph_bridge/mqtt_graph_bridge.py	Wed Dec 12 01:10:48 2018 -0800
@@ -1,16 +1,12 @@
 from docopt import docopt
-from mqtt.client.factory import MQTTFactory
 from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler
 from rdflib import Namespace, URIRef, Literal, Graph
 from rdflib.parser import StringInputSource
-from twisted.application.internet import ClientService, backoffPolicy
 from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks
-from twisted.internet.endpoints import clientFromString
 import cyclone.web
 import sys, logging
+from mqtt_client import MqttClient
 
-BROKER = "tcp:bang:1883"
 ROOM = Namespace('http://projects.bigasterisk.com/room/')
 
 devs = {
@@ -20,44 +16,6 @@
 logging.basicConfig()
 log = logging.getLogger()
 
-
-class MQTTService(ClientService):
-
-    def __init(self, endpoint, factory):
-        ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy())
-
-    def startService(self):
-        self.whenConnected().addCallback(self.connectToBroker)
-        ClientService.startService(self)
-
-    @inlineCallbacks
-    def connectToBroker(self, protocol):
-        self.protocol = protocol
-        self.protocol.onDisconnection = self.onDisconnection
-        # We are issuing 3 publish in a row
-        # if order matters, then set window size to 1
-        # Publish requests beyond window size are enqueued
-        self.protocol.setWindowSize(1)
-
-        try:
-            yield self.protocol.connect("TwistedMQTT-pub", keepalive=60)
-        except Exception as e:
-            log.error("Connecting to {broker} raised {excp!s}",
-                      broker=BROKER, excp=e)
-        else:
-            log.info("Connected to {broker}".format(broker=BROKER))
-
-    def onDisconnection(self, reason):
-        log.warn("Connection to broker lost: %r", reason)
-        self.whenConnected().addCallback(self.connectToBroker)
-
-    def publish(self, topic, msg):
-        def _logFailure(failure):
-            log.warn("publish failed: %s", failure.getErrorMessage())
-            return failure
-
-        return self.protocol.publish(topic=topic, qos=0, message=msg).addErrback(_logFailure)
-
 def rdfGraphBody(body, headers):
     g = Graph()
     g.parse(StringInputSource(body), format='nt')
@@ -85,7 +43,9 @@
         for dev, attrs in devs.items():
             if stmt[0:2] == (dev, ROOM['brightness']):
                 sw = 'OFF' if stmt[2].toPython() == 0 else 'ON'
-                serv.publish("%s/w1/light/switch" % attrs['root'], sw)
+                self.settings.mqtt.publish("%s/w1/light/switch" % attrs['root'], sw)
+                self.settings.mqtt.publish("%s/rgb/rgb/set" % attrs['root'],
+                                           '200,255,200' if sw == 'ON' else '0,0,0')
                 self.settings.masterGraph.patchObject(attrs['ctx'],
                                                       stmt[0], stmt[1], stmt[2])
                 return
@@ -105,9 +65,7 @@
 
     masterGraph = PatchableGraph()
 
-    factory    = MQTTFactory(profile=MQTTFactory.PUBLISHER)
-    myEndpoint = clientFromString(reactor, BROKER)
-    serv       = MQTTService(myEndpoint, factory)
+    mqtt = MqttClient(brokerPort=1883)
 
     port = 10008
     reactor.listenTCP(port, cyclone.web.Application([
@@ -115,12 +73,12 @@
         (r"/graph/events", CycloneGraphEventsHandler,
          {'masterGraph': masterGraph}),
         (r'/output', OutputPage),
-        ], serv=serv, masterGraph=masterGraph, debug=arg['-v']), interface='::')
+        ], mqtt=mqtt, masterGraph=masterGraph, debug=arg['-v']), interface='::')
     log.warn('serving on %s', port)
 
     for dev, attrs in devs.items():
         masterGraph.patchObject(attrs['ctx'],
                                 dev, ROOM['brightness'], Literal(0.0))
     
-    serv.startService()
+    
     reactor.run()
--- a/service/mqtt_graph_bridge/requirements.txt	Tue Dec 11 19:13:06 2018 -0800
+++ b/service/mqtt_graph_bridge/requirements.txt	Wed Dec 12 01:10:48 2018 -0800
@@ -3,3 +3,4 @@
 rdflib==4.2.2
 twisted-mqtt==0.3.6
 https://projects.bigasterisk.com/rdfdb/rdfdb-0.3.0.tar.gz
+rx==1.6.1