Mercurial > code > home > repos > homeauto
view service/mqtt_to_rdf/mqtt_to_rdf.py @ 798:cdc76c84e3e2
move conf into subdir
author | drewp@bigasterisk.com |
---|---|
date | Tue, 29 Dec 2020 21:05:32 -0800 |
parents | a3e430b39177 |
children | e0e623c01a69 |
line wrap: on
line source
""" Subscribe to mqtt topics; generate RDF statements. """ import json from pathlib import Path from typing import Callable, cast import cyclone.web import prometheus_client import rx import rx.operators import rx.scheduler.eventloop from docopt import docopt from mqtt_client import MqttClient from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph) from prometheus_client import Counter, Gauge, Histogram, Summary from prometheus_client.exposition import generate_latest from prometheus_client.registry import REGISTRY from rdfdb.rdflibpatch import graphFromQuads from rdflib import RDF, XSD, Graph, Literal, Namespace, URIRef from rdflib.term import Node from rx.core import Observable from standardservice.logsetup import log, verboseLogging from twisted.internet import reactor from button_events import button_events ROOM = Namespace('http://projects.bigasterisk.com/room/') collectors = {} def parseDurationLiteral(lit: Literal) -> float: if lit.endswith('s'): return float(lit.split('s')[0]) raise NotImplementedError(f'duration literal: {lit}') class MqttStatementSource: def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt): self.uri = uri self.config = config self.masterGraph = masterGraph self.mqtt = mqtt # deprecated self.internalMqtt = internalMqtt self.mqttTopic = self.topicFromConfig(self.config) log.debug(f'new mqttTopic {self.mqttTopic}') statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|') #scales.init(self, statPath) #self._mqttStats = scales.collection(statPath + '/incoming', scales.IntStat('count'), scales.RecentFpsStat('fps')) rawBytes = self.subscribeMqtt(self.mqttTopic) rawBytes = self.addFilters(rawBytes) rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes) parsed = self.getParser()(rawBytes) g = self.config for conv in g.items(g.value(self.uri, ROOM['conversions'])): parsed = self.conversionStep(conv)(parsed) outputQuadsSets = rx.combine_latest( *[self.makeQuads(parsed, plan) for plan in g.objects(self.uri, ROOM['graphStatements'])]) outputQuadsSets.subscribe_(self.updateQuads) def addFilters(self, rawBytes): jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals']) if jsonEq: required = json.loads(jsonEq.toPython()) def eq(jsonBytes): msg = json.loads(jsonBytes.decode('ascii')) return msg == required rawBytes = rx.operators.filter(eq)(rawBytes) return rawBytes def topicFromConfig(self, config) -> bytes: topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) return b'/'.join(t.encode('ascii') for t in topicParts) def subscribeMqtt(self, topic): # goal is to get everyone on the internal broker and eliminate this mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt return mqtt.subscribe(topic) def countIncomingMessage(self, _): pass #self._mqttStats.fps.mark() #self._mqttStats.count += 1 def getParser(self): g = self.config parser = g.value(self.uri, ROOM['parser']) if parser == XSD.double: return rx.operators.map(lambda v: Literal(float(v.decode('ascii')))) elif parser == ROOM['tagIdToUri']: return rx.operators.map(self.tagIdToUri) elif parser == ROOM['onOffBrightness']: return rx.operators.map(lambda v: Literal(0.0 if v == b'OFF' else 1.0)) elif parser == ROOM['jsonBrightness']: return rx.operators.map(self.parseJsonBrightness) elif ROOM['ValueMap'] in g.objects(parser, RDF.type): return rx.operators.map(lambda v: self.remap(parser, v.decode('ascii'))) elif parser == ROOM['rfCode']: return rx.operators.map(self.parseJsonRfCode) else: raise NotImplementedError(parser) def parseJsonBrightness(self, mqttValue: bytes): msg = json.loads(mqttValue.decode('ascii')) return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0) def parseJsonRfCode(self, mqttValue: bytes): msg = json.loads(mqttValue.decode('ascii')) return Literal('%08x%08x' % (msg['code0'], msg['code1'])) def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]: g = self.config if conv == ROOM['celsiusToFarenheit']: def c2f(value: Literal) -> Node: return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2)) return rx.operators.map(c2f) elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None: threshold = g.value(conv, ROOM['ignoreValueBelow']) return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython()) elif conv == ROOM['buttonPress']: loop = rx.scheduler.eventloop.TwistedScheduler(reactor) return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop) else: raise NotImplementedError(conv) def makeQuads(self, parsed, plan): g = self.config def quadsFromValue(valueNode): return set([(self.uri, g.value(plan, ROOM['outputPredicate']), valueNode, self.uri)]) def emptyQuads(element): return set([]) quads = rx.operators.map(quadsFromValue)(parsed) dur = g.value(plan, ROOM['statementLifetime']) if dur is not None: sec = parseDurationLiteral(dur) loop = rx.scheduler.eventloop.TwistedScheduler(reactor) quads = quads.pipe( rx.operators.debounce(sec, loop), rx.operators.map(emptyQuads), rx.operators.merge(quads), ) return quads def updateQuads(self, newGraphs): newQuads = set.union(*newGraphs) g = graphFromQuads(newQuads) log.debug(f'{self.uri} update to {len(newQuads)} statements') for quad in newQuads: meas = quad[0].split('/')[-1] if meas.startswith('airQuality'): where_prefix, type_ = meas[len('airQuality'):].split('door') where = where_prefix + 'door' metric = 'air' tags = {'loc': where.lower(), 'type': type_.lower()} val = quad[2].toPython() if metric not in collectors: collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys()) collectors[metric].labels(**tags).set(val) self.masterGraph.patchSubgraph(self.uri, g) def tagIdToUri(self, value: bytearray) -> URIRef: justHex = value.decode('ascii').replace('-', '').lower() int(justHex, 16) # validate return URIRef(f'http://bigasterisk.com/rfidCard/{justHex}') def remap(self, parser, valueStr: str): g = self.config value = Literal(valueStr) for entry in g.objects(parser, ROOM['map']): if value == g.value(entry, ROOM['from']): return g.value(entry, ROOM['to']) raise KeyError(value) class Metrics(cyclone.web.RequestHandler): def get(self): self.add_header('content-type', 'text/plain') self.write(generate_latest(REGISTRY)) if __name__ == '__main__': arg = docopt(""" Usage: mqtt_to_rdf.py [options] -v Verbose --cs=STR Only process config filenames with this substring """) verboseLogging(arg['-v']) config = Graph() for fn in Path('.').glob('conf/*.n3'): if not arg['--cs'] or str(arg['--cs']) in str(fn): log.debug(f'loading {fn}') config.parse(str(fn), format='n3') else: log.debug(f'skipping {fn}') masterGraph = PatchableGraph() mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-frontdoor.default.svc.cluster.local', brokerPort=10210) srcs = [] for src in config.subjects(RDF.type, ROOM['MqttStatementSource']): srcs.append(MqttStatementSource(src, config, masterGraph, mqtt=mqtt, internalMqtt=internalMqtt)) log.info(f'set up {len(srcs)} sources') port = 10018 reactor.listenTCP(port, cyclone.web.Application([ (r"/()", cyclone.web.StaticFileHandler, { "path": ".", "default_filename": "index.html" }), (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, { "path": "build" }), (r"/graph/mqtt", CycloneGraphHandler, { 'masterGraph': masterGraph }), (r"/graph/mqtt/events", CycloneGraphEventsHandler, { 'masterGraph': masterGraph }), (r'/metrics', Metrics), ], mqtt=mqtt, internalMqtt=internalMqtt, masterGraph=masterGraph, debug=arg['-v']), interface='::') log.warn('serving on %s', port) reactor.run()