Mercurial > code > home > repos > homeauto
view service/mqtt_to_rdf/mqtt_to_rdf.py @ 787:13970578a443
fix mqtt_to_rdf influx hostname
author | drewp@bigasterisk.com |
---|---|
date | Thu, 10 Sep 2020 15:00:18 -0700 |
parents | 729ab70c6212 |
children | 8f4e814eb1ab |
line wrap: on
line source
""" Subscribe to mqtt topics; generate RDF statements. """ import json from pathlib import Path import cyclone.web from docopt import docopt from export_to_influxdb import InfluxExporter from greplin import scales from greplin.scales.cyclonehandler import StatsHandler from mqtt_client import MqttClient from patchablegraph import ( CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph, ) from rdfdb.rdflibpatch import graphFromQuads from rdflib import Graph, Literal, Namespace, RDF, URIRef, XSD from rdflib.term import Node import rx import rx.operators import rx.scheduler.eventloop from standardservice.logsetup import log, verboseLogging from standardservice.scalessetup import gatherProcessStats from twisted.internet import reactor ROOM = Namespace('http://projects.bigasterisk.com/room/') gatherProcessStats() def parseDurationLiteral(lit: Literal) -> float: if lit.endswith('s'): return float(lit.split('s')[0]) raise NotImplementedError(f'duration literal: {lit}') class MqttStatementSource: def __init__(self, uri, config, masterGraph, mqtt, internalMqtt, influx): self.uri = uri self.config = config self.masterGraph = masterGraph self.mqtt = mqtt # deprecated self.internalMqtt = internalMqtt self.influx = influx self.mqttTopic = self.topicFromConfig(self.config) log.debug(f'new mqttTopic {self.mqttTopic}') statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|') scales.init(self, statPath) self._mqttStats = scales.collection(statPath + '/incoming', scales.IntStat('count'), scales.RecentFpsStat('fps')) rawBytes = self.subscribeMqtt(self.mqttTopic) rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes) parsed = self.getParser()(rawBytes) g = self.config for conv in g.items(g.value(self.uri, ROOM['conversions'])): parsed = self.conversionStep(conv)(parsed) outputQuadsSets = rx.combine_latest( *[self.makeQuads(parsed, plan) for plan in g.objects(self.uri, ROOM['graphStatements'])]) outputQuadsSets.subscribe_(self.updateQuads) def topicFromConfig(self, config) -> bytes: topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) return b'/'.join(t.encode('ascii') for t in topicParts) def subscribeMqtt(self, topic): # goal is to get everyone on the internal broker and eliminate this mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt return mqtt.subscribe(topic) def countIncomingMessage(self, _): self._mqttStats.fps.mark() self._mqttStats.count += 1 def getParser(self): g = self.config parser = g.value(self.uri, ROOM['parser']) if parser == XSD.double: return rx.operators.map(lambda v: Literal(float(v.decode('ascii')))) elif parser == ROOM['tagIdToUri']: return rx.operators.map(self.tagIdToUri) elif parser == ROOM['onOffBrightness']: return rx.operators.map(lambda v: Literal(0.0 if v == b'OFF' else 1.0)) elif parser == ROOM['jsonBrightness']: return rx.operators.map(self.parseJsonBrightness) elif ROOM['ValueMap'] in g.objects(parser, RDF.type): return rx.operators.map(lambda v: self.remap(parser, v.decode('ascii'))) else: raise NotImplementedError(parser) def parseJsonBrightness(self, mqttValue: bytes): msg = json.loads(mqttValue.decode('ascii')) return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0) def conversionStep(self, conv: Node): g = self.config if conv == ROOM['celsiusToFarenheit']: return rx.operators.map(lambda value: Literal(round(value.toPython() * 1.8 + 32, 2))) elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None: threshold = g.value(conv, ROOM['ignoreValueBelow']) return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython()) else: raise NotImplementedError(conv) def makeQuads(self, parsed, plan): g = self.config def quadsFromValue(valueNode): return set([(self.uri, g.value(plan, ROOM['outputPredicate']), valueNode, self.uri)]) def emptyQuads(element): return set([]) quads = rx.operators.map(quadsFromValue)(parsed) dur = g.value(plan, ROOM['statementLifetime']) if dur is not None: sec = parseDurationLiteral(dur) quads = quads.pipe( rx.operators.debounce(sec, rx.scheduler.eventloop.TwistedScheduler(reactor)), rx.operators.map(emptyQuads), rx.operators.merge(quads), ) return quads def updateQuads(self, newGraphs): newQuads = set.union(*newGraphs) g = graphFromQuads(newQuads) log.debug(f'{self.uri} update to {len(newQuads)} statements') self.influx.exportToInflux(newQuads) self.masterGraph.patchSubgraph(self.uri, g) def tagIdToUri(self, value: bytearray) -> URIRef: justHex = value.decode('ascii').replace('-', '').lower() int(justHex, 16) # validate return URIRef(f'http://bigasterisk.com/rfidCard/{justHex}') def remap(self, parser, valueStr: str): g = self.config value = Literal(valueStr) for entry in g.objects(parser, ROOM['map']): if value == g.value(entry, ROOM['from']): return g.value(entry, ROOM['to']) raise KeyError(value) if __name__ == '__main__': arg = docopt(""" Usage: mqtt_to_rdf.py [options] -v Verbose --cs=STR Only process config filenames with this substring """) verboseLogging(arg['-v']) config = Graph() for fn in Path('.').glob('config_*.n3'): if not arg['--cs'] or str(arg['--cs']) in str(fn): log.debug(f'loading {fn}') config.parse(str(fn), format='n3') else: log.debug(f'skipping {fn}') masterGraph = PatchableGraph() mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-frontdoor.default.svc.cluster.local', brokerPort=10210) influx = InfluxExporter(config, influxHost='influxdb.default.svc.cluster.local') srcs = [] for src in config.subjects(RDF.type, ROOM['MqttStatementSource']): srcs.append(MqttStatementSource(src, config, masterGraph, mqtt=mqtt, internalMqtt=internalMqtt, influx=influx)) log.info(f'set up {len(srcs)} sources') port = 10018 reactor.listenTCP(port, cyclone.web.Application([ (r"/()", cyclone.web.StaticFileHandler, { "path": ".", "default_filename": "index.html" }), (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, { "path": "build" }), (r'/stats/(.*)', StatsHandler, { 'serverName': 'mqtt_to_rdf' }), (r"/graph/mqtt", CycloneGraphHandler, { 'masterGraph': masterGraph }), (r"/graph/mqtt/events", CycloneGraphEventsHandler, { 'masterGraph': masterGraph }), ], mqtt=mqtt, internalMqtt=internalMqtt, masterGraph=masterGraph, debug=arg['-v']), interface='::') log.warn('serving on %s', port) reactor.run()