Mercurial > code > home > repos > homeauto
diff service/mqtt_graph_bridge/mqtt_graph_bridge.py @ 1183:6561367aa60a
factor common mqtt code out of mqtt_graph_bridge
Ignore-this: 21b54376d0b00f6709f1947c198cb4a8
darcs-hash:6d93c35ea6d305744693e3cb8366577b4183ca05
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Wed, 12 Dec 2018 01:10:48 -0800 |
parents | e3991af5bd39 |
children | 79d041273e26 |
line wrap: on
line diff
--- a/service/mqtt_graph_bridge/mqtt_graph_bridge.py Tue Dec 11 19:13:06 2018 -0800 +++ b/service/mqtt_graph_bridge/mqtt_graph_bridge.py Wed Dec 12 01:10:48 2018 -0800 @@ -1,16 +1,12 @@ from docopt import docopt -from mqtt.client.factory import MQTTFactory from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler from rdflib import Namespace, URIRef, Literal, Graph from rdflib.parser import StringInputSource -from twisted.application.internet import ClientService, backoffPolicy from twisted.internet import reactor -from twisted.internet.defer import inlineCallbacks -from twisted.internet.endpoints import clientFromString import cyclone.web import sys, logging +from mqtt_client import MqttClient -BROKER = "tcp:bang:1883" ROOM = Namespace('http://projects.bigasterisk.com/room/') devs = { @@ -20,44 +16,6 @@ logging.basicConfig() log = logging.getLogger() - -class MQTTService(ClientService): - - def __init(self, endpoint, factory): - ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy()) - - def startService(self): - self.whenConnected().addCallback(self.connectToBroker) - ClientService.startService(self) - - @inlineCallbacks - def connectToBroker(self, protocol): - self.protocol = protocol - self.protocol.onDisconnection = self.onDisconnection - # We are issuing 3 publish in a row - # if order matters, then set window size to 1 - # Publish requests beyond window size are enqueued - self.protocol.setWindowSize(1) - - try: - yield self.protocol.connect("TwistedMQTT-pub", keepalive=60) - except Exception as e: - log.error("Connecting to {broker} raised {excp!s}", - broker=BROKER, excp=e) - else: - log.info("Connected to {broker}".format(broker=BROKER)) - - def onDisconnection(self, reason): - log.warn("Connection to broker lost: %r", reason) - self.whenConnected().addCallback(self.connectToBroker) - - def publish(self, topic, msg): - def _logFailure(failure): - log.warn("publish failed: %s", failure.getErrorMessage()) - return failure - - return self.protocol.publish(topic=topic, qos=0, message=msg).addErrback(_logFailure) - def rdfGraphBody(body, headers): g = Graph() g.parse(StringInputSource(body), format='nt') @@ -85,7 +43,9 @@ for dev, attrs in devs.items(): if stmt[0:2] == (dev, ROOM['brightness']): sw = 'OFF' if stmt[2].toPython() == 0 else 'ON' - serv.publish("%s/w1/light/switch" % attrs['root'], sw) + self.settings.mqtt.publish("%s/w1/light/switch" % attrs['root'], sw) + self.settings.mqtt.publish("%s/rgb/rgb/set" % attrs['root'], + '200,255,200' if sw == 'ON' else '0,0,0') self.settings.masterGraph.patchObject(attrs['ctx'], stmt[0], stmt[1], stmt[2]) return @@ -105,9 +65,7 @@ masterGraph = PatchableGraph() - factory = MQTTFactory(profile=MQTTFactory.PUBLISHER) - myEndpoint = clientFromString(reactor, BROKER) - serv = MQTTService(myEndpoint, factory) + mqtt = MqttClient(brokerPort=1883) port = 10008 reactor.listenTCP(port, cyclone.web.Application([ @@ -115,12 +73,12 @@ (r"/graph/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}), (r'/output', OutputPage), - ], serv=serv, masterGraph=masterGraph, debug=arg['-v']), interface='::') + ], mqtt=mqtt, masterGraph=masterGraph, debug=arg['-v']), interface='::') log.warn('serving on %s', port) for dev, attrs in devs.items(): masterGraph.patchObject(attrs['ctx'], dev, ROOM['brightness'], Literal(0.0)) - serv.startService() + reactor.run()