changeset 373:2158e7ad19b1

receive oneshot updates from reasoning; emit commands on MQTT to control H801 wifi dimmer Ignore-this: 3b180d528a9bd7f2f330e565311013d6
author drewp@bigasterisk.com
date Sat, 08 Dec 2018 01:48:37 -0800
parents c52e7abdd6b1
children 4138cd1924f0
files service/mqtt_graph_bridge/Dockerfile service/mqtt_graph_bridge/makefile service/mqtt_graph_bridge/mqtt_graph_bridge.py service/mqtt_graph_bridge/readme service/mqtt_graph_bridge/requirements.txt
diffstat 5 files changed, 169 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_graph_bridge/Dockerfile	Sat Dec 08 01:48:37 2018 -0800
@@ -0,0 +1,12 @@
+FROM bang6:5000/base_x86
+
+WORKDIR /opt
+
+COPY requirements.txt ./
+RUN pip install -r requirements.txt
+
+COPY *.py *.html *.css *.js ./
+
+EXPOSE 10008:10008
+
+CMD [ "python", "./mqtt_graph_bridge.py", "-v" ]
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_graph_bridge/makefile	Sat Dec 08 01:48:37 2018 -0800
@@ -0,0 +1,19 @@
+JOB=mqtt_graph_bridge
+PORT=10008
+
+TAG=bang6:5000/${JOB}_x86:latest
+
+build_image:
+	rm -rf tmp_ctx
+	mkdir -p tmp_ctx
+	cp -a Dockerfile ../../lib/*.py *.py *.txt tmp_ctx
+	docker build --network=host -t ${TAG} tmp_ctx
+	docker push ${TAG}
+	rm -rf tmp_ctx
+
+
+shell:
+	docker run --rm -it --cap-add SYS_PTRACE --net=host bang6:5000/mqtt_graph_bridge_x86:latest  /bin/sh
+
+local_run:
+	docker run --rm -it --net=host bang6:5000/mqtt_graph_bridge_x86:latest
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_graph_bridge/mqtt_graph_bridge.py	Sat Dec 08 01:48:37 2018 -0800
@@ -0,0 +1,126 @@
+from docopt import docopt
+from mqtt.client.factory import MQTTFactory
+from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler
+from rdflib import Namespace, URIRef, Literal, Graph
+from rdflib.parser import StringInputSource
+from twisted.application.internet import ClientService, backoffPolicy
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks
+from twisted.internet.endpoints import clientFromString
+import cyclone.web
+import sys, logging
+
+BROKER = "tcp:bang:1883"
+ROOM = Namespace('http://projects.bigasterisk.com/room/')
+
+devs = {
+    ROOM['kitchenLight']: {'root': '004BD965', 'ctx': ROOM['kitchenH801']}
+}
+
+logging.basicConfig()
+log = logging.getLogger()
+
+
+class MQTTService(ClientService):
+
+    def __init(self, endpoint, factory):
+        ClientService.__init__(self, endpoint, factory, retryPolicy=backoffPolicy())
+
+    def startService(self):
+        self.whenConnected().addCallback(self.connectToBroker)
+        ClientService.startService(self)
+
+    @inlineCallbacks
+    def connectToBroker(self, protocol):
+        self.protocol = protocol
+        self.protocol.onDisconnection = self.onDisconnection
+        # We are issuing 3 publish in a row
+        # if order matters, then set window size to 1
+        # Publish requests beyond window size are enqueued
+        self.protocol.setWindowSize(1)
+
+        try:
+            yield self.protocol.connect("TwistedMQTT-pub", keepalive=60)
+        except Exception as e:
+            log.error("Connecting to {broker} raised {excp!s}",
+                      broker=BROKER, excp=e)
+        else:
+            log.info("Connected to {broker}".format(broker=BROKER))
+
+    def onDisconnection(self, reason):
+        log.warn("Connection to broker lost: %r", reason)
+        self.whenConnected().addCallback(self.connectToBroker)
+
+    def publish(self, topic, msg):
+        def _logFailure(failure):
+            log.warn("publish failed: %s", failure.getErrorMessage())
+            return failure
+
+        return self.protocol.publish(topic=topic, qos=0, message=msg).addErrback(_logFailure)
+
+def rdfGraphBody(body, headers):
+    g = Graph()
+    g.parse(StringInputSource(body), format='nt')
+    return g
+
+class OutputPage(cyclone.web.RequestHandler):
+    def put(self):
+        arg = self.request.arguments
+        if arg.get('s') and arg.get('p'):
+            subj = URIRef(arg['s'][-1])
+            pred = URIRef(arg['p'][-1])
+            turtleLiteral = self.request.body
+            try:
+                obj = Literal(float(turtleLiteral))
+            except ValueError:
+                obj = Literal(turtleLiteral)
+            stmt = (subj, pred, obj)
+        else:
+            g = rdfGraphBody(self.request.body, self.request.headers)
+            assert len(g) == 1, len(g)
+            stmt = g.triples((None, None, None)).next()
+        self._onStatement(stmt)
+            
+    def _onStatement(self, stmt):
+        for dev, attrs in devs.items():
+            if stmt[0:2] == (dev, ROOM['brightness']):
+                sw = 'OFF' if stmt[2].toPython() == 0 else 'ON'
+                serv.publish("%s/w1/light/switch" % attrs['root'], sw)
+                self.settings.masterGraph.patchObject(attrs['ctx'],
+                                                      stmt[0], stmt[1], stmt[2])
+                return
+        log.warn("ignoring %s", stmt)
+            
+if __name__ == '__main__':
+    arg = docopt("""
+    Usage: mqtt_graph_bridge.py [options]
+
+    -v   Verbose
+    """)
+    log.setLevel(logging.WARN)
+    if arg['-v']:
+        from twisted.python import log as twlog
+        twlog.startLogging(sys.stdout)
+        log.setLevel(logging.DEBUG)
+
+    masterGraph = PatchableGraph()
+
+    factory    = MQTTFactory(profile=MQTTFactory.PUBLISHER)
+    myEndpoint = clientFromString(reactor, BROKER)
+    serv       = MQTTService(myEndpoint, factory)
+
+    port = 10008
+    reactor.listenTCP(port, cyclone.web.Application([
+        (r"/graph", CycloneGraphHandler, {'masterGraph': masterGraph}),
+        (r"/graph/events", CycloneGraphEventsHandler,
+         {'masterGraph': masterGraph}),
+        (r'/output', OutputPage),
+        ], serv=serv, masterGraph=masterGraph, debug=arg['-v']), interface='::')
+    log.warn('serving on %s', port)
+
+    for dev, attrs in devs.items():
+        masterGraph.patchObject(attrs['ctx'],
+                                dev, ROOM['brightness'], Literal(0.0))
+    
+    serv.startService()
+    reactor.run()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_graph_bridge/readme	Sat Dec 08 01:48:37 2018 -0800
@@ -0,0 +1,7 @@
+tail all changes:
+mosquitto_sub -v -t 004BD965/\# 
+
+bang(pts/8):~% mosquitto_pub -t 004BD965/w1/light/switch -m ON  
+bang(pts/8):~% mosquitto_pub -t 004BD965/w1/light/switch -m OFF
+
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_graph_bridge/requirements.txt	Sat Dec 08 01:48:37 2018 -0800
@@ -0,0 +1,5 @@
+cyclone
+rdflib-jsonld==0.4.0
+rdflib==4.2.2
+twisted-mqtt==0.3.6
+https://projects.bigasterisk.com/rdfdb/rdfdb-0.3.0.tar.gz