changeset 1519:c7217cf1cfc1

add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated Ignore-this: 11b16f06340adc7228930f3774323d1a darcs-hash:a247b60725d7617a4712d6b8e914531020e82f80
author drewp <drewp@bigasterisk.com>
date Tue, 04 Feb 2020 23:33:21 -0800
parents 112258f2591b
children 4c780a079731
files service/rdf_from_mqtt/Dockerfile service/rdf_from_mqtt/config_bed_bar.n3 service/rdf_from_mqtt/config_cardreader.n3 service/rdf_from_mqtt/config_nightlight_ari.n3 service/rdf_from_mqtt/index.html service/rdf_from_mqtt/rdf_from_mqtt.py service/rdf_from_mqtt/requirements.txt service/rdf_from_mqtt/serv.n3 service/rdf_from_mqtt/tasks.py
diffstat 9 files changed, 367 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/rdf_from_mqtt/Dockerfile	Tue Feb 04 23:33:21 2020 -0800
@@ -0,0 +1,13 @@
+FROM bang6:5000/base_x86
+
+WORKDIR /opt
+
+COPY requirements.txt ./
+RUN pip3 install --index-url https://projects.bigasterisk.com/ --extra-index-url https://pypi.org/simple -r requirements.txt
+RUN pip3 install -U 'https://github.com/drewp/cyclone/archive/python3.zip?v3'
+
+COPY *.py *.html *.css *.js *.n3 ./
+
+EXPOSE 10018:10018
+
+CMD [ "python3", "./rdf_from_mqtt.py" ]
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/rdf_from_mqtt/config_bed_bar.n3	Tue Feb 04 23:33:21 2020 -0800
@@ -0,0 +1,27 @@
+@prefix : <http://projects.bigasterisk.com/room/> .
+@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
+@prefix fr: <http://bigasterisk.com/foaf/> .
+
+:buttonMap a :ValueMap;
+    :map [:from "OFF"; :to :notPressed], [:from "ON"; :to :pressed]
+  .
+
+:bedBarAsherButton1 a :MqttStatementSource;
+  :mqttTopic ("bed_bar_asher" "binary_sensor" "button_1" "state");
+  :parser :buttonMap;
+  :graphStatements [:outputPredicate :state;] .
+  
+:bedBarAsherButton2 a :MqttStatementSource;
+  :mqttTopic ("bed_bar_asher" "binary_sensor" "button_2" "state");
+  :parser :buttonMap;
+  :graphStatements [:outputPredicate :state;] .
+  
+:bedBarAsherButton3 a :MqttStatementSource;
+  :mqttTopic ("bed_bar_asher" "binary_sensor" "button_3" "state");
+  :parser :buttonMap;
+  :graphStatements [:outputPredicate :state;] .
+  
+:bedBarAsherButton4 a :MqttStatementSource;
+  :mqttTopic ("bed_bar_asher" "binary_sensor" "button_4" "state");
+  :parser :buttonMap;
+  :graphStatements [:outputPredicate :state;] .
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/rdf_from_mqtt/config_cardreader.n3	Tue Feb 04 23:33:21 2020 -0800
@@ -0,0 +1,14 @@
+@prefix : <http://projects.bigasterisk.com/room/> .
+@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
+@prefix fr: <http://bigasterisk.com/foaf/> .
+
+:cardReader a :MqttStatementSource;
+  :mqttTopic ("frontwindow" "tag");
+  :parser :tagIdToUri;  # AA-BB-CC-DD to <http://bigasterisk.com/rfidCard/aabbccdd>
+  
+  :graphStatements [
+     :outputPredicate :currentRead;
+     :statementLifetime "5s";
+  ]
+  .
+  
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/rdf_from_mqtt/config_nightlight_ari.n3	Tue Feb 04 23:33:21 2020 -0800
@@ -0,0 +1,26 @@
+@prefix : <http://projects.bigasterisk.com/room/> .
+@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
+@prefix fr: <http://bigasterisk.com/foaf/> .
+@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
+
+
+:nightlightAriTemperature a :MqttStatementSource;
+  :mqttTopic ("nightlight_ari" "sensor" "temperature" "state");
+
+  :parser xsd:double;
+  :conversions (:celsiusToFarenheit
+                [:ignoreValueBelow -999]);
+  :graphStatements [
+    :outputPredicate :temperatureF;  
+    :statementLifetime "150s";
+  # ], [
+  #  :conversions ([:recentLow "30s"]);
+  #  :outputPredicate :recentLowTemperatureF;
+  ];
+  
+  :influxMeasurement [ # replaces this block in piNode configs
+    :measurement "temperatureF";
+    :predicate :temperatureF;
+    :tag [:key "host"; :value "nightlight_ari"],
+         [:key "location"; :value "ariRoom"]] .
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/rdf_from_mqtt/index.html	Tue Feb 04 23:33:21 2020 -0800
@@ -0,0 +1,32 @@
+<!doctype html>
+<html>
+  <head>
+    <title>rdf_from_mqtt</title>
+    <meta charset="utf-8">
+    <script src="/lib/polymer/1.0.9/webcomponentsjs/webcomponents.min.js"></script>
+    <script src="/lib/require/require-2.3.3.js"></script>
+    <script src="/rdf/common_paths_and_ns.js"></script>
+
+    <link rel="stylesheet" href="/rdf/browse/style.css">
+
+    <link rel="import" href="/rdf/streamed-graph.html">
+    <link rel="import" href="/lib/polymer/1.0.9/polymer/polymer.html">
+
+    <meta name="mobile-web-app-capable" content="yes">
+    <meta name="viewport" content="width=device-width, initial-scale=1">
+  </head>
+  <body class="rdfBrowsePage">
+    <template id="t" is="dom-bind">
+      <streamed-graph url="mqtt/events" graph="{{graph}}"></streamed-graph>
+      <div id="out"></div>
+      <script type="module" src="/rdf/streamed_graph_view.js"></script>
+    </template>
+
+      <div class="served-resources">
+        <a href="stats/">/stats/</a>
+        <a href="mqtt">/mqtt</a>
+        <a href="mqtt/events">/mqtt/events</a>
+      </div>
+
+  </body>
+</html>
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/rdf_from_mqtt/rdf_from_mqtt.py	Tue Feb 04 23:33:21 2020 -0800
@@ -0,0 +1,202 @@
+"""
+Subscribe to mqtt topics; generate RDF statements.
+"""
+import json
+import sys
+from docopt import docopt
+from rdflib import Namespace, URIRef, Literal, Graph, RDF, XSD
+from rdflib.parser import StringInputSource
+from rdflib.term import Node
+from twisted.internet import reactor
+import cyclone.web
+import rx, rx.operators, rx.scheduler.eventloop
+from greplin import scales
+from greplin.scales.cyclonehandler import StatsHandler
+
+from export_to_influxdb import InfluxExporter
+from mqtt_client import MqttClient
+
+from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler
+from rdfdb.patch import Patch
+from rdfdb.rdflibpatch import graphFromQuads
+from standardservice.logsetup import log, verboseLogging
+from standardservice.scalessetup import gatherProcessStats
+
+ROOM = Namespace('http://projects.bigasterisk.com/room/')
+
+gatherProcessStats()
+
+def parseDurationLiteral(lit: Literal) -> float:
+    if lit.endswith('s'):
+        return float(lit.split('s')[0])
+    raise NotImplementedError(f'duration literal: {lit}')
+
+
+class MqttStatementSource:
+    def __init__(self, uri, config, masterGraph, mqtt, influx):
+        self.uri = uri
+        self.config = config
+        self.masterGraph = masterGraph
+        self.mqtt = mqtt
+        self.influx = influx
+
+        self.mqttTopic = self.topicFromConfig(self.config)
+
+        statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|')
+        scales.init(self, statPath)
+        self._mqttStats = scales.collection(
+            statPath + '/incoming', scales.IntStat('count'),
+            scales.RecentFpsStat('fps'))
+
+
+        rawBytes = self.subscribeMqtt()
+        rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes)
+        parsed = self.getParser()(rawBytes)
+
+        g = self.config
+        for conv in g.items(g.value(self.uri, ROOM['conversions'])):
+            parsed = self.conversionStep(conv)(parsed)
+
+        outputQuadsSets = rx.combine_latest(
+            *[self.makeQuads(parsed, plan)
+              for plan in g.objects(self.uri, ROOM['graphStatements'])])
+
+        outputQuadsSets.subscribe_(self.updateQuads)
+
+    def topicFromConfig(self, config) -> bytes:
+        topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic'])))
+        return b'/'.join(t.encode('ascii') for t in topicParts)
+
+
+    def subscribeMqtt(self):
+        return self.mqtt.subscribe(self.mqttTopic)
+
+    def countIncomingMessage(self, _):
+        self._mqttStats.fps.mark()
+        self._mqttStats.count += 1
+
+    def getParser(self):
+        g = self.config
+        parser = g.value(self.uri, ROOM['parser'])
+        if parser == XSD.double:
+            return rx.operators.map(lambda v: Literal(float(v.decode('ascii'))))
+        elif parser == ROOM['tagIdToUri']:
+            return rx.operators.map(self.tagIdToUri)
+        elif parser == ROOM['onOffBrightness']:
+            return rx.operators.map(lambda v: Literal(0.0 if v == b'OFF' else 1.0))
+        elif parser == ROOM['jsonBrightness']:
+            return rx.operators.map(self.parseJsonBrightness)
+        elif ROOM['ValueMap'] in g.objects(parser, RDF.type):
+            return rx.operators.map(lambda v: self.remap(parser, v.decode('ascii')))
+        else:
+            raise NotImplementedError(parser)
+
+    def parseJsonBrightness(self, mqttValue: bytes):
+        msg = json.loads(mqttValue.decode('ascii'))
+        return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0)
+
+    def conversionStep(self, conv: Node):
+        g = self.config
+        if conv == ROOM['celsiusToFarenheit']:
+            return rx.operators.map(lambda value: Literal(round(value.toPython() * 1.8 + 32, 2)))
+        elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None:
+            threshold = g.value(conv, ROOM['ignoreValueBelow'])
+            return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython())
+        else:
+            raise NotImplementedError(conv)
+
+    def makeQuads(self, parsed, plan):
+        g = self.config
+        def quadsFromValue(valueNode):
+            return set([
+                (self.uri,
+                 g.value(plan, ROOM['outputPredicate']),
+                 valueNode,
+                 self.uri)
+            ])
+
+        def emptyQuads(element):
+            return set([])
+
+        quads = rx.operators.map(quadsFromValue)(parsed)
+
+        dur = g.value(plan, ROOM['statementLifetime'])
+        if dur is not None:
+            sec = parseDurationLiteral(dur)
+            quads = quads.pipe(
+                rx.operators.debounce(sec, rx.scheduler.eventloop.TwistedScheduler(reactor)),
+                rx.operators.map(emptyQuads),
+                rx.operators.merge(quads),
+                )
+
+        return quads
+
+    def updateQuads(self, newGraphs):
+        newQuads = set.union(*newGraphs)
+        g = graphFromQuads(newQuads)
+        log.debug(f'{self.uri} update to {len(newQuads)} statements')
+
+        self.influx.exportToInflux(newQuads)
+
+        self.masterGraph.patchSubgraph(self.uri, g)
+
+    def tagIdToUri(self, value: bytearray) -> URIRef:
+        justHex = value.decode('ascii').replace('-', '').lower()
+        int(justHex, 16)  # validate
+        return URIRef(f'http://bigasterisk.com/rfidCard/{justHex}')
+
+    def remap(self, parser, valueStr: str):
+        g = self.config
+        value = Literal(valueStr)
+        for entry in g.objects(parser, ROOM['map']):
+            if value == g.value(entry, ROOM['from']):
+                return g.value(entry, ROOM['to'])
+        raise KeyError(value)
+
+
+if __name__ == '__main__':
+    arg = docopt("""
+    Usage: rdf_from_mqtt.py [options]
+
+    -v        Verbose
+    --cs=STR  Only process config filenames with this substring
+    """)
+    verboseLogging(arg['-v'])
+
+    config = Graph()
+    for fn in [
+            "config_cardreader.n3",
+            "config_nightlight_ari.n3",
+            "config_bed_bar.n3",
+            "config_air_quality_indoor.n3",
+            "config_air_quality_outdoor.n3",
+            "config_living_lamps.n3",
+            "config_kitchen.n3",
+    ]:
+        if not arg['--cs'] or arg['--cs'] in fn:
+            config.parse(fn, format='n3')
+
+    masterGraph = PatchableGraph()
+
+    mqtt = MqttClient(clientId='rdf_from_mqtt', brokerHost='bang',
+                      brokerPort=1883)
+    influx = InfluxExporter(config)
+
+    srcs = []
+    for src in config.subjects(RDF.type, ROOM['MqttStatementSource']):
+        srcs.append(MqttStatementSource(src, config, masterGraph, mqtt, influx))
+    log.info(f'set up {len(srcs)} sources')
+
+    port = 10018
+    reactor.listenTCP(port, cyclone.web.Application([
+        (r"/()", cyclone.web.StaticFileHandler,
+         {"path": ".", "default_filename": "index.html"}),
+        (r'/stats/(.*)', StatsHandler, {'serverName': 'rdf_from_mqtt'}),
+        (r"/graph/mqtt", CycloneGraphHandler, {'masterGraph': masterGraph}),
+        (r"/graph/mqtt/events", CycloneGraphEventsHandler,
+         {'masterGraph': masterGraph}),
+        ], mqtt=mqtt, masterGraph=masterGraph, debug=arg['-v']),
+                      interface='::')
+    log.warn('serving on %s', port)
+
+    reactor.run()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/rdf_from_mqtt/requirements.txt	Tue Feb 04 23:33:21 2020 -0800
@@ -0,0 +1,15 @@
+pytype
+
+cyclone
+rdflib-jsonld==0.4.0
+rdflib==4.2.2
+twisted-mqtt==0.3.6
+git+http://github.com/drewp/scales.git@550bcba42c5e96152ff5c5bd753fbb33ffdfe460#egg=scales
+git+https://github.com/ReactiveX/RxPY.git@6deb66e827f34a88b4605773d7671322b9cbbd08#egg=rx
+
+cycloneerr
+export_to_influxdb==0.4.0
+mqtt_client==0.9.0
+patchablegraph==0.11.0
+rdfdb==0.21.0
+standardservice==0.6.0
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/rdf_from_mqtt/serv.n3	Tue Feb 04 23:33:21 2020 -0800
@@ -0,0 +1,24 @@
+@prefix : <http://bigasterisk.com/ns/serv#> .
+@prefix auth: <http://bigasterisk.com/ns/serv/auth#> .
+@prefix serv: <http://bigasterisk.com/services/> .
+
+serv:rdf_from_mqtt a :Service;
+      :path "/rdf_from_mqtt/";
+      :openid auth:admin;
+      :serverHost "bang";
+      :internalPort 10018;
+      :prodDockerFlags (
+      "-p" "10018:10018"
+      "--net=host");
+      :localDockerFlags (
+        "-v" "`pwd`:/opt"
+        "-v" "/my/proj/homeauto/lib:/lib_src"
+      );
+      :localRunCmdline (
+      
+      "python3" "rdf_from_mqtt.py" "-v"
+#"--cs" "living"
+);
+      :dockerFile "Dockerfile"
+.
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/rdf_from_mqtt/tasks.py	Tue Feb 04 23:33:21 2020 -0800
@@ -0,0 +1,14 @@
+from invoke import task, Collection
+
+import sys
+sys.path.append('/my/proj/release')
+from serv_tasks import serv_tasks
+
+ns = Collection()
+serv_tasks(ns, 'serv.n3', 'rdf_from_mqtt')
+
+@ns.add_task
+@task
+def tail_mqtt(ctx):
+    internal_mqtt_port = 10010
+    ctx.run(f'mosquitto_sub -h bang -p 1883 -d -v -t \#')