diff service/mqtt_graph_bridge/mqtt_graph_bridge.py @ 1183:6561367aa60a

factor common mqtt code out of mqtt_graph_bridge Ignore-this: 21b54376d0b00f6709f1947c198cb4a8 darcs-hash:6d93c35ea6d305744693e3cb8366577b4183ca05
author drewp <drewp@bigasterisk.com>
date Wed, 12 Dec 2018 01:10:48 -0800
parents e3991af5bd39
children 79d041273e26
line wrap: on
line diff
--- a/service/mqtt_graph_bridge/mqtt_graph_bridge.py	Tue Dec 11 19:13:06 2018 -0800
+++ b/service/mqtt_graph_bridge/mqtt_graph_bridge.py	Wed Dec 12 01:10:48 2018 -0800
@@ -1,16 +1,12 @@
 from docopt import docopt
-from mqtt.client.factory import MQTTFactory
 from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler
 from rdflib import Namespace, URIRef, Literal, Graph
 from rdflib.parser import StringInputSource
-from twisted.application.internet import ClientService, backoffPolicy
 from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks
-from twisted.internet.endpoints import clientFromString
 import cyclone.web
 import sys, logging
+from mqtt_client import MqttClient
 
-BROKER = "tcp:bang:1883"
 ROOM = Namespace('http://projects.bigasterisk.com/room/')
 
 devs = {
@@ -20,44 +16,6 @@
 logging.basicConfig()
 log = logging.getLogger()
 
-
-class MQTTService(ClientService):
-
-    def __init(self, endpoint, factory):
-        ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy())
-
-    def startService(self):
-        self.whenConnected().addCallback(self.connectToBroker)
-        ClientService.startService(self)
-
-    @inlineCallbacks
-    def connectToBroker(self, protocol):
-        self.protocol = protocol
-        self.protocol.onDisconnection = self.onDisconnection
-        # We are issuing 3 publish in a row
-        # if order matters, then set window size to 1
-        # Publish requests beyond window size are enqueued
-        self.protocol.setWindowSize(1)
-
-        try:
-            yield self.protocol.connect("TwistedMQTT-pub", keepalive=60)
-        except Exception as e:
-            log.error("Connecting to {broker} raised {excp!s}",
-                      broker=BROKER, excp=e)
-        else:
-            log.info("Connected to {broker}".format(broker=BROKER))
-
-    def onDisconnection(self, reason):
-        log.warn("Connection to broker lost: %r", reason)
-        self.whenConnected().addCallback(self.connectToBroker)
-
-    def publish(self, topic, msg):
-        def _logFailure(failure):
-            log.warn("publish failed: %s", failure.getErrorMessage())
-            return failure
-
-        return self.protocol.publish(topic=topic, qos=0, message=msg).addErrback(_logFailure)
-
 def rdfGraphBody(body, headers):
     g = Graph()
     g.parse(StringInputSource(body), format='nt')
@@ -85,7 +43,9 @@
         for dev, attrs in devs.items():
             if stmt[0:2] == (dev, ROOM['brightness']):
                 sw = 'OFF' if stmt[2].toPython() == 0 else 'ON'
-                serv.publish("%s/w1/light/switch" % attrs['root'], sw)
+                self.settings.mqtt.publish("%s/w1/light/switch" % attrs['root'], sw)
+                self.settings.mqtt.publish("%s/rgb/rgb/set" % attrs['root'],
+                                           '200,255,200' if sw == 'ON' else '0,0,0')
                 self.settings.masterGraph.patchObject(attrs['ctx'],
                                                       stmt[0], stmt[1], stmt[2])
                 return
@@ -105,9 +65,7 @@
 
     masterGraph = PatchableGraph()
 
-    factory    = MQTTFactory(profile=MQTTFactory.PUBLISHER)
-    myEndpoint = clientFromString(reactor, BROKER)
-    serv       = MQTTService(myEndpoint, factory)
+    mqtt = MqttClient(brokerPort=1883)
 
     port = 10008
     reactor.listenTCP(port, cyclone.web.Application([
@@ -115,12 +73,12 @@
         (r"/graph/events", CycloneGraphEventsHandler,
          {'masterGraph': masterGraph}),
         (r'/output', OutputPage),
-        ], serv=serv, masterGraph=masterGraph, debug=arg['-v']), interface='::')
+        ], mqtt=mqtt, masterGraph=masterGraph, debug=arg['-v']), interface='::')
     log.warn('serving on %s', port)
 
     for dev, attrs in devs.items():
         masterGraph.patchObject(attrs['ctx'],
                                 dev, ROOM['brightness'], Literal(0.0))
     
-    serv.startService()
+    
     reactor.run()