Mercurial > code > home > repos > homeauto
changeset 373:2158e7ad19b1
receive oneshot updates from reasoning; emit commands on MQTT to control H801 wifi dimmer
Ignore-this: 3b180d528a9bd7f2f330e565311013d6
author | drewp@bigasterisk.com |
---|---|
date | Sat, 08 Dec 2018 01:48:37 -0800 |
parents | c52e7abdd6b1 |
children | 4138cd1924f0 |
files | service/mqtt_graph_bridge/Dockerfile service/mqtt_graph_bridge/makefile service/mqtt_graph_bridge/mqtt_graph_bridge.py service/mqtt_graph_bridge/readme service/mqtt_graph_bridge/requirements.txt |
diffstat | 5 files changed, 169 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_graph_bridge/Dockerfile Sat Dec 08 01:48:37 2018 -0800 @@ -0,0 +1,12 @@ +FROM bang6:5000/base_x86 + +WORKDIR /opt + +COPY requirements.txt ./ +RUN pip install -r requirements.txt + +COPY *.py *.html *.css *.js ./ + +EXPOSE 10008:10008 + +CMD [ "python", "./mqtt_graph_bridge.py", "-v" ]
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_graph_bridge/makefile Sat Dec 08 01:48:37 2018 -0800 @@ -0,0 +1,19 @@ +JOB=mqtt_graph_bridge +PORT=10008 + +TAG=bang6:5000/${JOB}_x86:latest + +build_image: + rm -rf tmp_ctx + mkdir -p tmp_ctx + cp -a Dockerfile ../../lib/*.py *.py *.txt tmp_ctx + docker build --network=host -t ${TAG} tmp_ctx + docker push ${TAG} + rm -rf tmp_ctx + + +shell: + docker run --rm -it --cap-add SYS_PTRACE --net=host bang6:5000/mqtt_graph_bridge_x86:latest /bin/sh + +local_run: + docker run --rm -it --net=host bang6:5000/mqtt_graph_bridge_x86:latest
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_graph_bridge/mqtt_graph_bridge.py Sat Dec 08 01:48:37 2018 -0800 @@ -0,0 +1,126 @@ +from docopt import docopt +from mqtt.client.factory import MQTTFactory +from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler +from rdflib import Namespace, URIRef, Literal, Graph +from rdflib.parser import StringInputSource +from twisted.application.internet import ClientService, backoffPolicy +from twisted.internet import reactor +from twisted.internet.defer import inlineCallbacks +from twisted.internet.endpoints import clientFromString +import cyclone.web +import sys, logging + +BROKER = "tcp:bang:1883" +ROOM = Namespace('http://projects.bigasterisk.com/room/') + +devs = { + ROOM['kitchenLight']: {'root': '004BD965', 'ctx': ROOM['kitchenH801']} +} + +logging.basicConfig() +log = logging.getLogger() + + +class MQTTService(ClientService): + + def __init(self, endpoint, factory): + ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy()) + + def startService(self): + self.whenConnected().addCallback(self.connectToBroker) + ClientService.startService(self) + + @inlineCallbacks + def connectToBroker(self, protocol): + self.protocol = protocol + self.protocol.onDisconnection = self.onDisconnection + # We are issuing 3 publish in a row + # if order matters, then set window size to 1 + # Publish requests beyond window size are enqueued + self.protocol.setWindowSize(1) + + try: + yield self.protocol.connect("TwistedMQTT-pub", keepalive=60) + except Exception as e: + log.error("Connecting to {broker} raised {excp!s}", + broker=BROKER, excp=e) + else: + log.info("Connected to {broker}".format(broker=BROKER)) + + def onDisconnection(self, reason): + log.warn("Connection to broker lost: %r", reason) + self.whenConnected().addCallback(self.connectToBroker) + + def publish(self, topic, msg): + def _logFailure(failure): + log.warn("publish failed: %s", failure.getErrorMessage()) + return failure + + return self.protocol.publish(topic=topic, qos=0, message=msg).addErrback(_logFailure) + +def rdfGraphBody(body, headers): + g = Graph() + g.parse(StringInputSource(body), format='nt') + return g + +class OutputPage(cyclone.web.RequestHandler): + def put(self): + arg = self.request.arguments + if arg.get('s') and arg.get('p'): + subj = URIRef(arg['s'][-1]) + pred = URIRef(arg['p'][-1]) + turtleLiteral = self.request.body + try: + obj = Literal(float(turtleLiteral)) + except ValueError: + obj = Literal(turtleLiteral) + stmt = (subj, pred, obj) + else: + g = rdfGraphBody(self.request.body, self.request.headers) + assert len(g) == 1, len(g) + stmt = g.triples((None, None, None)).next() + self._onStatement(stmt) + + def _onStatement(self, stmt): + for dev, attrs in devs.items(): + if stmt[0:2] == (dev, ROOM['brightness']): + sw = 'OFF' if stmt[2].toPython() == 0 else 'ON' + serv.publish("%s/w1/light/switch" % attrs['root'], sw) + self.settings.masterGraph.patchObject(attrs['ctx'], + stmt[0], stmt[1], stmt[2]) + return + log.warn("ignoring %s", stmt) + +if __name__ == '__main__': + arg = docopt(""" + Usage: mqtt_graph_bridge.py [options] + + -v Verbose + """) + log.setLevel(logging.WARN) + if arg['-v']: + from twisted.python import log as twlog + twlog.startLogging(sys.stdout) + log.setLevel(logging.DEBUG) + + masterGraph = PatchableGraph() + + factory = MQTTFactory(profile=MQTTFactory.PUBLISHER) + myEndpoint = clientFromString(reactor, BROKER) + serv = MQTTService(myEndpoint, factory) + + port = 10008 + reactor.listenTCP(port, cyclone.web.Application([ + (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}), + (r"/graph/events", CycloneGraphEventsHandler, + {'masterGraph': masterGraph}), + (r'/output', OutputPage), + ], serv=serv, masterGraph=masterGraph, debug=arg['-v']), interface='::') + log.warn('serving on %s', port) + + for dev, attrs in devs.items(): + masterGraph.patchObject(attrs['ctx'], + dev, ROOM['brightness'], Literal(0.0)) + + serv.startService() + reactor.run()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_graph_bridge/readme Sat Dec 08 01:48:37 2018 -0800 @@ -0,0 +1,7 @@ +tail all changes: +mosquitto_sub -v -t 004BD965/\# + +bang(pts/8):~% mosquitto_pub -t 004BD965/w1/light/switch -m ON +bang(pts/8):~% mosquitto_pub -t 004BD965/w1/light/switch -m OFF + +