Mercurial > code > home > repos > homeauto
changeset 718:edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
Ignore-this: 11b16f06340adc7228930f3774323d1a
author | drewp@bigasterisk.com |
---|---|
date | Tue, 04 Feb 2020 23:33:21 -0800 |
parents | e9d9b8506cb3 |
children | 34343fb39fbe |
files | service/rdf_from_mqtt/Dockerfile service/rdf_from_mqtt/config_bed_bar.n3 service/rdf_from_mqtt/config_cardreader.n3 service/rdf_from_mqtt/config_nightlight_ari.n3 service/rdf_from_mqtt/index.html service/rdf_from_mqtt/rdf_from_mqtt.py service/rdf_from_mqtt/requirements.txt service/rdf_from_mqtt/serv.n3 service/rdf_from_mqtt/tasks.py |
diffstat | 9 files changed, 367 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/rdf_from_mqtt/Dockerfile Tue Feb 04 23:33:21 2020 -0800 @@ -0,0 +1,13 @@ +FROM bang6:5000/base_x86 + +WORKDIR /opt + +COPY requirements.txt ./ +RUN pip3 install --index-url https://projects.bigasterisk.com/ --extra-index-url https://pypi.org/simple -r requirements.txt +RUN pip3 install -U 'https://github.com/drewp/cyclone/archive/python3.zip?v3' + +COPY *.py *.html *.css *.js *.n3 ./ + +EXPOSE 10018:10018 + +CMD [ "python3", "./rdf_from_mqtt.py" ]
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/rdf_from_mqtt/config_bed_bar.n3 Tue Feb 04 23:33:21 2020 -0800 @@ -0,0 +1,27 @@ +@prefix : <http://projects.bigasterisk.com/room/> . +@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> . +@prefix fr: <http://bigasterisk.com/foaf/> . + +:buttonMap a :ValueMap; + :map [:from "OFF"; :to :notPressed], [:from "ON"; :to :pressed] + . + +:bedBarAsherButton1 a :MqttStatementSource; + :mqttTopic ("bed_bar_asher" "binary_sensor" "button_1" "state"); + :parser :buttonMap; + :graphStatements [:outputPredicate :state;] . + +:bedBarAsherButton2 a :MqttStatementSource; + :mqttTopic ("bed_bar_asher" "binary_sensor" "button_2" "state"); + :parser :buttonMap; + :graphStatements [:outputPredicate :state;] . + +:bedBarAsherButton3 a :MqttStatementSource; + :mqttTopic ("bed_bar_asher" "binary_sensor" "button_3" "state"); + :parser :buttonMap; + :graphStatements [:outputPredicate :state;] . + +:bedBarAsherButton4 a :MqttStatementSource; + :mqttTopic ("bed_bar_asher" "binary_sensor" "button_4" "state"); + :parser :buttonMap; + :graphStatements [:outputPredicate :state;] . \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/rdf_from_mqtt/config_cardreader.n3 Tue Feb 04 23:33:21 2020 -0800 @@ -0,0 +1,14 @@ +@prefix : <http://projects.bigasterisk.com/room/> . +@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> . +@prefix fr: <http://bigasterisk.com/foaf/> . + +:cardReader a :MqttStatementSource; + :mqttTopic ("frontwindow" "tag"); + :parser :tagIdToUri; # AA-BB-CC-DD to <http://bigasterisk.com/rfidCard/aabbccdd> + + :graphStatements [ + :outputPredicate :currentRead; + :statementLifetime "5s"; + ] + . + \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/rdf_from_mqtt/config_nightlight_ari.n3 Tue Feb 04 23:33:21 2020 -0800 @@ -0,0 +1,26 @@ +@prefix : <http://projects.bigasterisk.com/room/> . +@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> . +@prefix fr: <http://bigasterisk.com/foaf/> . +@prefix xsd: <http://www.w3.org/2001/XMLSchema#> . + + +:nightlightAriTemperature a :MqttStatementSource; + :mqttTopic ("nightlight_ari" "sensor" "temperature" "state"); + + :parser xsd:double; + :conversions (:celsiusToFarenheit + [:ignoreValueBelow -999]); + :graphStatements [ + :outputPredicate :temperatureF; + :statementLifetime "150s"; + # ], [ + # :conversions ([:recentLow "30s"]); + # :outputPredicate :recentLowTemperatureF; + ]; + + :influxMeasurement [ # replaces this block in piNode configs + :measurement "temperatureF"; + :predicate :temperatureF; + :tag [:key "host"; :value "nightlight_ari"], + [:key "location"; :value "ariRoom"]] . +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/rdf_from_mqtt/index.html Tue Feb 04 23:33:21 2020 -0800 @@ -0,0 +1,32 @@ +<!doctype html> +<html> + <head> + <title>rdf_from_mqtt</title> + <meta charset="utf-8"> + <script src="/lib/polymer/1.0.9/webcomponentsjs/webcomponents.min.js"></script> + <script src="/lib/require/require-2.3.3.js"></script> + <script src="/rdf/common_paths_and_ns.js"></script> + + <link rel="stylesheet" href="/rdf/browse/style.css"> + + <link rel="import" href="/rdf/streamed-graph.html"> + <link rel="import" href="/lib/polymer/1.0.9/polymer/polymer.html"> + + <meta name="mobile-web-app-capable" content="yes"> + <meta name="viewport" content="width=device-width, initial-scale=1"> + </head> + <body class="rdfBrowsePage"> + <template id="t" is="dom-bind"> + <streamed-graph url="mqtt/events" graph="{{graph}}"></streamed-graph> + <div id="out"></div> + <script type="module" src="/rdf/streamed_graph_view.js"></script> + </template> + + <div class="served-resources"> + <a href="stats/">/stats/</a> + <a href="mqtt">/mqtt</a> + <a href="mqtt/events">/mqtt/events</a> + </div> + + </body> +</html>
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/rdf_from_mqtt/rdf_from_mqtt.py Tue Feb 04 23:33:21 2020 -0800 @@ -0,0 +1,202 @@ +""" +Subscribe to mqtt topics; generate RDF statements. +""" +import json +import sys +from docopt import docopt +from rdflib import Namespace, URIRef, Literal, Graph, RDF, XSD +from rdflib.parser import StringInputSource +from rdflib.term import Node +from twisted.internet import reactor +import cyclone.web +import rx, rx.operators, rx.scheduler.eventloop +from greplin import scales +from greplin.scales.cyclonehandler import StatsHandler + +from export_to_influxdb import InfluxExporter +from mqtt_client import MqttClient + +from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler +from rdfdb.patch import Patch +from rdfdb.rdflibpatch import graphFromQuads +from standardservice.logsetup import log, verboseLogging +from standardservice.scalessetup import gatherProcessStats + +ROOM = Namespace('http://projects.bigasterisk.com/room/') + +gatherProcessStats() + +def parseDurationLiteral(lit: Literal) -> float: + if lit.endswith('s'): + return float(lit.split('s')[0]) + raise NotImplementedError(f'duration literal: {lit}') + + +class MqttStatementSource: + def __init__(self, uri, config, masterGraph, mqtt, influx): + self.uri = uri + self.config = config + self.masterGraph = masterGraph + self.mqtt = mqtt + self.influx = influx + + self.mqttTopic = self.topicFromConfig(self.config) + + statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|') + scales.init(self, statPath) + self._mqttStats = scales.collection( + statPath + '/incoming', scales.IntStat('count'), + scales.RecentFpsStat('fps')) + + + rawBytes = self.subscribeMqtt() + rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes) + parsed = self.getParser()(rawBytes) + + g = self.config + for conv in g.items(g.value(self.uri, ROOM['conversions'])): + parsed = self.conversionStep(conv)(parsed) + + outputQuadsSets = rx.combine_latest( + *[self.makeQuads(parsed, plan) + for plan in g.objects(self.uri, ROOM['graphStatements'])]) + + outputQuadsSets.subscribe_(self.updateQuads) + + def topicFromConfig(self, config) -> bytes: + topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) + return b'/'.join(t.encode('ascii') for t in topicParts) + + + def subscribeMqtt(self): + return self.mqtt.subscribe(self.mqttTopic) + + def countIncomingMessage(self, _): + self._mqttStats.fps.mark() + self._mqttStats.count += 1 + + def getParser(self): + g = self.config + parser = g.value(self.uri, ROOM['parser']) + if parser == XSD.double: + return rx.operators.map(lambda v: Literal(float(v.decode('ascii')))) + elif parser == ROOM['tagIdToUri']: + return rx.operators.map(self.tagIdToUri) + elif parser == ROOM['onOffBrightness']: + return rx.operators.map(lambda v: Literal(0.0 if v == b'OFF' else 1.0)) + elif parser == ROOM['jsonBrightness']: + return rx.operators.map(self.parseJsonBrightness) + elif ROOM['ValueMap'] in g.objects(parser, RDF.type): + return rx.operators.map(lambda v: self.remap(parser, v.decode('ascii'))) + else: + raise NotImplementedError(parser) + + def parseJsonBrightness(self, mqttValue: bytes): + msg = json.loads(mqttValue.decode('ascii')) + return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0) + + def conversionStep(self, conv: Node): + g = self.config + if conv == ROOM['celsiusToFarenheit']: + return rx.operators.map(lambda value: Literal(round(value.toPython() * 1.8 + 32, 2))) + elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None: + threshold = g.value(conv, ROOM['ignoreValueBelow']) + return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython()) + else: + raise NotImplementedError(conv) + + def makeQuads(self, parsed, plan): + g = self.config + def quadsFromValue(valueNode): + return set([ + (self.uri, + g.value(plan, ROOM['outputPredicate']), + valueNode, + self.uri) + ]) + + def emptyQuads(element): + return set([]) + + quads = rx.operators.map(quadsFromValue)(parsed) + + dur = g.value(plan, ROOM['statementLifetime']) + if dur is not None: + sec = parseDurationLiteral(dur) + quads = quads.pipe( + rx.operators.debounce(sec, rx.scheduler.eventloop.TwistedScheduler(reactor)), + rx.operators.map(emptyQuads), + rx.operators.merge(quads), + ) + + return quads + + def updateQuads(self, newGraphs): + newQuads = set.union(*newGraphs) + g = graphFromQuads(newQuads) + log.debug(f'{self.uri} update to {len(newQuads)} statements') + + self.influx.exportToInflux(newQuads) + + self.masterGraph.patchSubgraph(self.uri, g) + + def tagIdToUri(self, value: bytearray) -> URIRef: + justHex = value.decode('ascii').replace('-', '').lower() + int(justHex, 16) # validate + return URIRef(f'http://bigasterisk.com/rfidCard/{justHex}') + + def remap(self, parser, valueStr: str): + g = self.config + value = Literal(valueStr) + for entry in g.objects(parser, ROOM['map']): + if value == g.value(entry, ROOM['from']): + return g.value(entry, ROOM['to']) + raise KeyError(value) + + +if __name__ == '__main__': + arg = docopt(""" + Usage: rdf_from_mqtt.py [options] + + -v Verbose + --cs=STR Only process config filenames with this substring + """) + verboseLogging(arg['-v']) + + config = Graph() + for fn in [ + "config_cardreader.n3", + "config_nightlight_ari.n3", + "config_bed_bar.n3", + "config_air_quality_indoor.n3", + "config_air_quality_outdoor.n3", + "config_living_lamps.n3", + "config_kitchen.n3", + ]: + if not arg['--cs'] or arg['--cs'] in fn: + config.parse(fn, format='n3') + + masterGraph = PatchableGraph() + + mqtt = MqttClient(clientId='rdf_from_mqtt', brokerHost='bang', + brokerPort=1883) + influx = InfluxExporter(config) + + srcs = [] + for src in config.subjects(RDF.type, ROOM['MqttStatementSource']): + srcs.append(MqttStatementSource(src, config, masterGraph, mqtt, influx)) + log.info(f'set up {len(srcs)} sources') + + port = 10018 + reactor.listenTCP(port, cyclone.web.Application([ + (r"/()", cyclone.web.StaticFileHandler, + {"path": ".", "default_filename": "index.html"}), + (r'/stats/(.*)', StatsHandler, {'serverName': 'rdf_from_mqtt'}), + (r"/graph/mqtt", CycloneGraphHandler, {'masterGraph': masterGraph}), + (r"/graph/mqtt/events", CycloneGraphEventsHandler, + {'masterGraph': masterGraph}), + ], mqtt=mqtt, masterGraph=masterGraph, debug=arg['-v']), + interface='::') + log.warn('serving on %s', port) + + reactor.run()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/rdf_from_mqtt/requirements.txt Tue Feb 04 23:33:21 2020 -0800 @@ -0,0 +1,15 @@ +pytype + +cyclone +rdflib-jsonld==0.4.0 +rdflib==4.2.2 +twisted-mqtt==0.3.6 +git+http://github.com/drewp/scales.git@550bcba42c5e96152ff5c5bd753fbb33ffdfe460#egg=scales +git+https://github.com/ReactiveX/RxPY.git@6deb66e827f34a88b4605773d7671322b9cbbd08#egg=rx + +cycloneerr +export_to_influxdb==0.4.0 +mqtt_client==0.9.0 +patchablegraph==0.11.0 +rdfdb==0.21.0 +standardservice==0.6.0
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/rdf_from_mqtt/serv.n3 Tue Feb 04 23:33:21 2020 -0800 @@ -0,0 +1,24 @@ +@prefix : <http://bigasterisk.com/ns/serv#> . +@prefix auth: <http://bigasterisk.com/ns/serv/auth#> . +@prefix serv: <http://bigasterisk.com/services/> . + +serv:rdf_from_mqtt a :Service; + :path "/rdf_from_mqtt/"; + :openid auth:admin; + :serverHost "bang"; + :internalPort 10018; + :prodDockerFlags ( + "-p" "10018:10018" + "--net=host"); + :localDockerFlags ( + "-v" "`pwd`:/opt" + "-v" "/my/proj/homeauto/lib:/lib_src" + ); + :localRunCmdline ( + + "python3" "rdf_from_mqtt.py" "-v" +#"--cs" "living" +); + :dockerFile "Dockerfile" +. +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/rdf_from_mqtt/tasks.py Tue Feb 04 23:33:21 2020 -0800 @@ -0,0 +1,14 @@ +from invoke import task, Collection + +import sys +sys.path.append('/my/proj/release') +from serv_tasks import serv_tasks + +ns = Collection() +serv_tasks(ns, 'serv.n3', 'rdf_from_mqtt') + +@ns.add_task +@task +def tail_mqtt(ctx): + internal_mqtt_port = 10010 + ctx.run(f'mosquitto_sub -h bang -p 1883 -d -v -t \#')