Mercurial > code > home > repos > homeauto
diff service/mqtt_to_rdf/mqtt_to_rdf.py @ 733:9ca69f2be87b
more mqtt_to_rdf renames. bring in basic LitElement setup for the debug page
Ignore-this: 85e2ab49915e44b08219e537fab21870
author | drewp@bigasterisk.com |
---|---|
date | Sat, 08 Feb 2020 04:02:22 -0800 |
parents | service/mqtt_to_rdf/rdf_from_mqtt.py@fdddbdaf07b5 |
children | f3607a373a00 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_to_rdf/mqtt_to_rdf.py Sat Feb 08 04:02:22 2020 -0800 @@ -0,0 +1,204 @@ +""" +Subscribe to mqtt topics; generate RDF statements. +""" +import json +import sys +from docopt import docopt +from rdflib import Namespace, URIRef, Literal, Graph, RDF, XSD +from rdflib.parser import StringInputSource +from rdflib.term import Node +from twisted.internet import reactor +import cyclone.web +import rx, rx.operators, rx.scheduler.eventloop +from greplin import scales +from greplin.scales.cyclonehandler import StatsHandler + +from export_to_influxdb import InfluxExporter +from mqtt_client import MqttClient + +from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler +from rdfdb.patch import Patch +from rdfdb.rdflibpatch import graphFromQuads +from standardservice.logsetup import log, verboseLogging +from standardservice.scalessetup import gatherProcessStats + +ROOM = Namespace('http://projects.bigasterisk.com/room/') + +gatherProcessStats() + +def parseDurationLiteral(lit: Literal) -> float: + if lit.endswith('s'): + return float(lit.split('s')[0]) + raise NotImplementedError(f'duration literal: {lit}') + + +class MqttStatementSource: + def __init__(self, uri, config, masterGraph, mqtt, influx): + self.uri = uri + self.config = config + self.masterGraph = masterGraph + self.mqtt = mqtt + self.influx = influx + + self.mqttTopic = self.topicFromConfig(self.config) + + statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|') + scales.init(self, statPath) + self._mqttStats = scales.collection( + statPath + '/incoming', scales.IntStat('count'), + scales.RecentFpsStat('fps')) + + + rawBytes = self.subscribeMqtt() + rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes) + parsed = self.getParser()(rawBytes) + + g = self.config + for conv in g.items(g.value(self.uri, ROOM['conversions'])): + parsed = self.conversionStep(conv)(parsed) + + outputQuadsSets = rx.combine_latest( + *[self.makeQuads(parsed, plan) + for plan in g.objects(self.uri, ROOM['graphStatements'])]) + + outputQuadsSets.subscribe_(self.updateQuads) + + def topicFromConfig(self, config) -> bytes: + topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) + return b'/'.join(t.encode('ascii') for t in topicParts) + + + def subscribeMqtt(self): + return self.mqtt.subscribe(self.mqttTopic) + + def countIncomingMessage(self, _): + self._mqttStats.fps.mark() + self._mqttStats.count += 1 + + def getParser(self): + g = self.config + parser = g.value(self.uri, ROOM['parser']) + if parser == XSD.double: + return rx.operators.map(lambda v: Literal(float(v.decode('ascii')))) + elif parser == ROOM['tagIdToUri']: + return rx.operators.map(self.tagIdToUri) + elif parser == ROOM['onOffBrightness']: + return rx.operators.map(lambda v: Literal(0.0 if v == b'OFF' else 1.0)) + elif parser == ROOM['jsonBrightness']: + return rx.operators.map(self.parseJsonBrightness) + elif ROOM['ValueMap'] in g.objects(parser, RDF.type): + return rx.operators.map(lambda v: self.remap(parser, v.decode('ascii'))) + else: + raise NotImplementedError(parser) + + def parseJsonBrightness(self, mqttValue: bytes): + msg = json.loads(mqttValue.decode('ascii')) + return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0) + + def conversionStep(self, conv: Node): + g = self.config + if conv == ROOM['celsiusToFarenheit']: + return rx.operators.map(lambda value: Literal(round(value.toPython() * 1.8 + 32, 2))) + elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None: + threshold = g.value(conv, ROOM['ignoreValueBelow']) + return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython()) + else: + raise NotImplementedError(conv) + + def makeQuads(self, parsed, plan): + g = self.config + def quadsFromValue(valueNode): + return set([ + (self.uri, + g.value(plan, ROOM['outputPredicate']), + valueNode, + self.uri) + ]) + + def emptyQuads(element): + return set([]) + + quads = rx.operators.map(quadsFromValue)(parsed) + + dur = g.value(plan, ROOM['statementLifetime']) + if dur is not None: + sec = parseDurationLiteral(dur) + quads = quads.pipe( + rx.operators.debounce(sec, rx.scheduler.eventloop.TwistedScheduler(reactor)), + rx.operators.map(emptyQuads), + rx.operators.merge(quads), + ) + + return quads + + def updateQuads(self, newGraphs): + newQuads = set.union(*newGraphs) + g = graphFromQuads(newQuads) + log.debug(f'{self.uri} update to {len(newQuads)} statements') + + self.influx.exportToInflux(newQuads) + + self.masterGraph.patchSubgraph(self.uri, g) + + def tagIdToUri(self, value: bytearray) -> URIRef: + justHex = value.decode('ascii').replace('-', '').lower() + int(justHex, 16) # validate + return URIRef(f'http://bigasterisk.com/rfidCard/{justHex}') + + def remap(self, parser, valueStr: str): + g = self.config + value = Literal(valueStr) + for entry in g.objects(parser, ROOM['map']): + if value == g.value(entry, ROOM['from']): + return g.value(entry, ROOM['to']) + raise KeyError(value) + + +if __name__ == '__main__': + arg = docopt(""" + Usage: mqtt_to_rdf.py [options] + + -v Verbose + --cs=STR Only process config filenames with this substring + """) + verboseLogging(arg['-v']) + + config = Graph() + for fn in [ + "config_cardreader.n3", + "config_nightlight_ari.n3", + "config_bed_bar.n3", + "config_air_quality_indoor.n3", + "config_air_quality_outdoor.n3", + "config_living_lamps.n3", + "config_kitchen.n3", + ]: + if not arg['--cs'] or arg['--cs'] in fn: + config.parse(fn, format='n3') + + masterGraph = PatchableGraph() + + mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='bang', + brokerPort=1883) + influx = InfluxExporter(config) + + srcs = [] + for src in config.subjects(RDF.type, ROOM['MqttStatementSource']): + srcs.append(MqttStatementSource(src, config, masterGraph, mqtt, influx)) + log.info(f'set up {len(srcs)} sources') + + port = 10018 + reactor.listenTCP(port, cyclone.web.Application([ + (r"/()", cyclone.web.StaticFileHandler, + {"path": ".", "default_filename": "index.html"}), + (r"/build/(bundle.js)", + cyclone.web.StaticFileHandler, {"path": "build"}), + (r'/stats/(.*)', StatsHandler, {'serverName': 'mqtt_to_rdf'}), + (r"/graph/mqtt", CycloneGraphHandler, {'masterGraph': masterGraph}), + (r"/graph/mqtt/events", CycloneGraphEventsHandler, + {'masterGraph': masterGraph}), + ], mqtt=mqtt, masterGraph=masterGraph, debug=arg['-v']), + interface='::') + log.warn('serving on %s', port) + + reactor.run()