Mercurial > code > home > repos > homeauto
changeset 378:b90d9321d2ce
factor common mqtt code out of mqtt_graph_bridge
Ignore-this: 21b54376d0b00f6709f1947c198cb4a8
author | drewp@bigasterisk.com |
---|---|
date | Wed, 12 Dec 2018 01:10:48 -0800 |
parents | 5b690bfc31b2 |
children | 67cebf7a14de |
files | lib/mqtt_client.py service/mqtt_graph_bridge/mqtt_graph_bridge.py service/mqtt_graph_bridge/requirements.txt |
diffstat | 3 files changed, 98 insertions(+), 49 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/mqtt_client.py Wed Dec 12 01:10:48 2018 -0800 @@ -0,0 +1,90 @@ +from mqtt.client.factory import MQTTFactory +from twisted.application.internet import ClientService, backoffPolicy +from twisted.internet.defer import inlineCallbacks, Deferred +from twisted.internet.endpoints import clientFromString +import logging +from twisted.internet import reactor +from rx.concurrency import TwistedScheduler +from rx import Observable + +log = logging.getLogger('mqtt_client') + +class MQTTService(ClientService): + + def __init__(self, endpoint, factory): + self.endpoint = endpoint + ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy()) + + def startService(self): + self.whenConnected().addCallback(self.connectToBroker) + ClientService.startService(self) + + @inlineCallbacks + def connectToBroker(self, protocol): + self.protocol = protocol + self.protocol.onDisconnection = self.onDisconnection + # We are issuing 3 publish in a row + # if order matters, then set window size to 1 + # Publish requests beyond window size are enqueued + self.protocol.setWindowSize(1) + + try: + yield self.protocol.connect("TwistedMQTT-pub", keepalive=60) + except Exception as e: + log.error("Connecting to {broker} raised {excp!s}", + broker=self.endpoint, excp=e) + else: + log.info("Connected to {broker}".format(broker=self.endpoint)) + if getattr(self, 'onMqttConnectionMade', False): + self.onMqttConnectionMade() + + def onDisconnection(self, reason): + log.warn("Connection to broker lost: %r", reason) + self.whenConnected().addCallback(self.connectToBroker) + + def publish(self, topic, msg): + def _logFailure(failure): + log.warn("publish failed: %s", failure.getErrorMessage()) + return failure + + return self.protocol.publish(topic=topic, qos=0, message=msg).addErrback(_logFailure) + + +class MqttClient(object): + def __init__(self, brokerHost='bang', brokerPort=1883): + + #scheduler = TwistedScheduler(reactor) + + factory = MQTTFactory(profile=MQTTFactory.PUBLISHER | MQTTFactory.SUBSCRIBER) + myEndpoint = clientFromString(reactor, 'tcp:%s:%s' % (brokerHost, brokerPort)) + myEndpoint.__class__.__repr__ = lambda self: repr('%s:%s' % (self._host, self._port)) + self.serv = MQTTService(myEndpoint, factory) + self.serv.startService() + + def publish(self, topic, msg): + return self.serv.publish(topic, msg) + + def subscribe(self, topic): + """returns rx.Observable of payload strings""" + # This is surely broken for multiple topics and subscriptions. Might not even + # work over a reconnect. + + ret = Observable.create(self._observe_msgs) + + self.serv.onMqttConnectionMade = lambda: self._resubscribe(topic) + if (hasattr(self.serv, 'protocol') and + self.serv.protocol.state ==self.serv.protocol.CONNECTED): + self._resubscribe(topic) + return ret + + def _resubscribe(self, topic): + log.info('subscribing %r', topic) + self.serv.protocol.onPublish = self._onPublish + return self.serv.protocol.subscribe(topic, 2) + + def _observe_msgs(self, observer): + self.obs = observer + + def _onPublish(self, topic, payload, qos, dup, retain, msgId): + log.debug('received payload %r', payload) + self.obs.on_next(payload)
--- a/service/mqtt_graph_bridge/mqtt_graph_bridge.py Tue Dec 11 19:13:06 2018 -0800 +++ b/service/mqtt_graph_bridge/mqtt_graph_bridge.py Wed Dec 12 01:10:48 2018 -0800 @@ -1,16 +1,12 @@ from docopt import docopt -from mqtt.client.factory import MQTTFactory from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler from rdflib import Namespace, URIRef, Literal, Graph from rdflib.parser import StringInputSource -from twisted.application.internet import ClientService, backoffPolicy from twisted.internet import reactor -from twisted.internet.defer import inlineCallbacks -from twisted.internet.endpoints import clientFromString import cyclone.web import sys, logging +from mqtt_client import MqttClient -BROKER = "tcp:bang:1883" ROOM = Namespace('http://projects.bigasterisk.com/room/') devs = { @@ -20,44 +16,6 @@ logging.basicConfig() log = logging.getLogger() - -class MQTTService(ClientService): - - def __init(self, endpoint, factory): - ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy()) - - def startService(self): - self.whenConnected().addCallback(self.connectToBroker) - ClientService.startService(self) - - @inlineCallbacks - def connectToBroker(self, protocol): - self.protocol = protocol - self.protocol.onDisconnection = self.onDisconnection - # We are issuing 3 publish in a row - # if order matters, then set window size to 1 - # Publish requests beyond window size are enqueued - self.protocol.setWindowSize(1) - - try: - yield self.protocol.connect("TwistedMQTT-pub", keepalive=60) - except Exception as e: - log.error("Connecting to {broker} raised {excp!s}", - broker=BROKER, excp=e) - else: - log.info("Connected to {broker}".format(broker=BROKER)) - - def onDisconnection(self, reason): - log.warn("Connection to broker lost: %r", reason) - self.whenConnected().addCallback(self.connectToBroker) - - def publish(self, topic, msg): - def _logFailure(failure): - log.warn("publish failed: %s", failure.getErrorMessage()) - return failure - - return self.protocol.publish(topic=topic, qos=0, message=msg).addErrback(_logFailure) - def rdfGraphBody(body, headers): g = Graph() g.parse(StringInputSource(body), format='nt') @@ -85,7 +43,9 @@ for dev, attrs in devs.items(): if stmt[0:2] == (dev, ROOM['brightness']): sw = 'OFF' if stmt[2].toPython() == 0 else 'ON' - serv.publish("%s/w1/light/switch" % attrs['root'], sw) + self.settings.mqtt.publish("%s/w1/light/switch" % attrs['root'], sw) + self.settings.mqtt.publish("%s/rgb/rgb/set" % attrs['root'], + '200,255,200' if sw == 'ON' else '0,0,0') self.settings.masterGraph.patchObject(attrs['ctx'], stmt[0], stmt[1], stmt[2]) return @@ -105,9 +65,7 @@ masterGraph = PatchableGraph() - factory = MQTTFactory(profile=MQTTFactory.PUBLISHER) - myEndpoint = clientFromString(reactor, BROKER) - serv = MQTTService(myEndpoint, factory) + mqtt = MqttClient(brokerPort=1883) port = 10008 reactor.listenTCP(port, cyclone.web.Application([ @@ -115,12 +73,12 @@ (r"/graph/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}), (r'/output', OutputPage), - ], serv=serv, masterGraph=masterGraph, debug=arg['-v']), interface='::') + ], mqtt=mqtt, masterGraph=masterGraph, debug=arg['-v']), interface='::') log.warn('serving on %s', port) for dev, attrs in devs.items(): masterGraph.patchObject(attrs['ctx'], dev, ROOM['brightness'], Literal(0.0)) - serv.startService() + reactor.run()