Mercurial > code > home > repos > homeauto
diff service/mqtt_to_rdf/mqtt_to_rdf.py @ 1577:6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
author | drewp@bigasterisk.com |
---|---|
date | Thu, 26 Aug 2021 16:33:05 -0700 |
parents | e0e623c01a69 |
children | b0608eb6e90c |
line wrap: on
line diff
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py Thu May 13 01:06:26 2021 -0700 +++ b/service/mqtt_to_rdf/mqtt_to_rdf.py Thu Aug 26 16:33:05 2021 -0700 @@ -1,11 +1,17 @@ """ Subscribe to mqtt topics; generate RDF statements. """ +import os import time import json -from logging import debug +import logging from pathlib import Path -from typing import Callable, cast +from typing import Callable, Sequence, Set, Tuple, Union, cast +from cyclone.util import ObjectDict + +from rx.core.typing import Mapper + +from export_to_influxdb import InfluxExporter import cyclone.web import cyclone.sse @@ -25,13 +31,17 @@ from rx.core import Observable from standardservice.logsetup import log, verboseLogging from twisted.internet import reactor, task - +from dataclasses import dataclass from button_events import button_events ROOM = Namespace('http://projects.bigasterisk.com/room/') collectors = {} +import export_to_influxdb + +print(f'merge me back {export_to_influxdb}') + def appendLimit(lst, elem, n=10): del lst[:len(lst) - n + 1] @@ -44,15 +54,171 @@ raise NotImplementedError(f'duration literal: {lit}') +@dataclass +class StreamPipelineStep: + uri: URIRef # a :MqttStatementSource + config: Graph + + def makeOutputStream(self, inStream: Observable) -> Observable: + return inStream + + +class Filters(StreamPipelineStep): + + def makeOutputStream(self, inStream: Observable) -> Observable: + jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals']) + if jsonEq: + required = json.loads(jsonEq.toPython()) + + def eq(jsonBytes): + msg = json.loads(jsonBytes.decode('utf8')) + return msg == required + + outStream = rx.operators.filter(eq)(inStream) + else: + outStream = inStream + return outStream + + +class Parser(StreamPipelineStep): + + def makeOutputStream(self, inStream: Observable) -> Observable: + parser = self.getParser() + return parser(inStream) + + def getParser(self) -> Callable[[Observable], Observable]: + parserType = cast(URIRef, self.config.value(self.uri, ROOM['parser'])) + func = self.getParserFunc(parserType) + return rx.operators.map(cast(Mapper, func)) + + def getParserFunc(self, parserType: URIRef) -> Callable[[bytes], Node]: + if parserType == XSD.double: + return lambda v: Literal(float(v)) + elif parserType == ROOM['tagIdToUri']: + return self.tagIdToUri + elif parserType == ROOM['onOffBrightness']: + return lambda v: Literal(0.0 if v == b'OFF' else 1.0) + elif parserType == ROOM['jsonBrightness']: + return self.parseJsonBrightness + elif ROOM['ValueMap'] in self.config.objects(parserType, RDF.type): + return lambda v: self.remap(parserType, v.decode('utf8')) + elif parserType == ROOM['rfCode']: + return self.parseJsonRfCode + elif parserType == ROOM['tradfri']: + return self.parseTradfriMessage + else: + raise NotImplementedError(parserType) + + def tagIdToUri(self, value: bytes) -> URIRef: + justHex = value.decode('utf8').replace('-', '').lower() + int(justHex, 16) # validate + return URIRef(f'http://bigasterisk.com/rfidCard/{justHex}') + + def parseJsonBrightness(self, mqttValue: bytes): + msg = json.loads(mqttValue.decode('utf8')) + return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0) + + def remap(self, parser, valueStr: str) -> Node: + g = self.config + value = Literal(valueStr) + for entry in g.objects(parser, ROOM['map']): + if value == g.value(entry, ROOM['from']): + to_ = g.value(entry, ROOM['to']) + if not isinstance(to_, Node): + raise TypeError(f'{to_=}') + return to_ + raise KeyError(value) + + def parseJsonRfCode(self, mqttValue: bytes): + msg = json.loads(mqttValue.decode('utf8')) + return Literal('%08x%08x' % (msg['code0'], msg['code1'])) + + def parseTradfriMessage(self, mqttValue: bytes) -> Node: + log.info(f'trad {mqttValue}') + return Literal('todo') + + +class Converters(StreamPipelineStep): + + def makeOutputStream(self, inStream: Observable) -> Observable: + out = inStream + g = self.config + for conv in g.items(g.value(self.uri, ROOM['conversions'])): + out = self.conversionStep(conv)(out) + return out + + def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]: + g = self.config + if conv == ROOM['celsiusToFarenheit']: + + return rx.operators.map(cast(Mapper, self.c2f)) + elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None: + threshold = cast(Literal, g.value(conv, ROOM['ignoreValueBelow'])).toPython() + return rx.operators.filter(lambda value: cast(Literal, value).toPython() >= threshold) + elif conv == ROOM['buttonPress']: + loop = rx.scheduler.eventloop.TwistedScheduler(reactor) + return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop) + else: + raise NotImplementedError(conv) + + def c2f(self, value: Literal) -> Node: + return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2)) + + +class Rdfizer(StreamPipelineStep): + + def makeOutputStream(self, inStream: Observable) -> Observable: + outputQuadsSets = rx.combine_latest( + *[self.makeQuads(inStream, plan) for plan in self.config.objects(self.uri, ROOM['graphStatements'])]) + return outputQuadsSets + + def makeQuads(self, inStream: Observable, plan: URIRef) -> Observable: + + def quadsFromValue(valueNode): + return set([(self.uri, self.config.value(plan, ROOM['outputPredicate']), valueNode, self.uri)]) + + def emptyQuads(element) -> Set[Tuple]: + return set([]) + + quads = rx.operators.map(cast(Mapper, quadsFromValue))(inStream) + + dur = self.config.value(plan, ROOM['statementLifetime']) + if dur is not None: + sec = parseDurationLiteral(dur) + loop = rx.scheduler.eventloop.TwistedScheduler(reactor) + quads = quads.pipe( + rx.operators.debounce(sec, loop), + rx.operators.map(cast(Mapper, emptyQuads)), + rx.operators.merge(quads), + ) + + return quads + + +def truncTime(): + return round(time.time(), 3) + + +def tightN3(node: Union[URIRef, Literal]) -> str: + return node.n3().replace('http://www.w3.org/2001/XMLSchema#', 'xsd:') + + +def serializeWithNs(graph: PatchableGraph) -> bytes: + graph._graph.bind('', 'http://projects.bigasterisk.com/room/') + return cast(bytes, graph.serialize(format='n3')) + + class MqttStatementSource: - def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData): + def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData, + influxExport: InfluxExporter): self.uri = uri self.config = config self.masterGraph = masterGraph self.debugPageData = debugPageData self.mqtt = mqtt # deprecated self.internalMqtt = internalMqtt + self.influxExport = influxExport self.mqttTopic = self.topicFromConfig(self.config) log.debug(f'new mqttTopic {self.mqttTopic}') @@ -71,36 +237,25 @@ self.debugPageData['subscribed'].append(self.debugSub) rawBytes: Observable = self.subscribeMqtt(self.mqttTopic) - # rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes) rawBytes.subscribe(on_next=self.countIncomingMessage) - # rawBytes = self.addFilters(rawBytes) - # parsed = self.getParser()(rawBytes) - # g = self.config - # for conv in g.items(g.value(self.uri, ROOM['conversions'])): - # parsed = self.conversionStep(conv)(parsed) + filteredBytes = Filters(uri, config).makeOutputStream(rawBytes) - # outputQuadsSets = rx.combine_latest( *[self.makeQuads(parsed, plan) for plan in g.objects(self.uri, ROOM['graphStatements'])]) + parsedObjs = Parser(uri, config).makeOutputStream(filteredBytes) + parsedObjs.subscribe_(lambda v: appendLimit(self.debugSub['recentParsed'], {'t': truncTime(), 'n3': tightN3(v)})) - # outputQuadsSets.subscribe_(self.updateQuads) - - def addFilters(self, rawBytes): - jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals']) - if jsonEq: - required = json.loads(jsonEq.toPython()) + convertedObjs = Converters(uri, config).makeOutputStream(parsedObjs) + convertedObjs.subscribe_(lambda v: appendLimit(self.debugSub['recentConversions'], {'t': truncTime(), 'n3': tightN3(v)})) - def eq(jsonBytes): - msg = json.loads(jsonBytes.decode('ascii')) - return msg == required + outputQuadsSets = Rdfizer(uri, config).makeOutputStream(convertedObjs) - rawBytes = rx.operators.filter(eq)(rawBytes) - return rawBytes + outputQuadsSets.subscribe_(self.updateMasterGraph) def topicFromConfig(self, config) -> bytes: topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) return b'/'.join(t.encode('ascii') for t in topicParts) - def subscribeMqtt(self, topic): + def subscribeMqtt(self, topic: bytes): # goal is to get everyone on the internal broker and eliminate this mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt return mqtt.subscribe(topic) @@ -109,78 +264,11 @@ self.debugPageData['messagesSeen'] += 1 appendLimit(self.debugSub['recentMessages'], { - 't': round(time.time(), 3), + 't': truncTime(), 'msg': msg.decode('ascii'), }) - def getParser(self): - g = self.config - - parser = g.value(self.uri, ROOM['parser']) - if parser == XSD.double: - return rx.operators.map(lambda v: Literal(float(v.decode('ascii')))) - elif parser == ROOM['tagIdToUri']: - return rx.operators.map(self.tagIdToUri) - elif parser == ROOM['onOffBrightness']: - return rx.operators.map(lambda v: Literal(0.0 if v == b'OFF' else 1.0)) - elif parser == ROOM['jsonBrightness']: - return rx.operators.map(self.parseJsonBrightness) - elif ROOM['ValueMap'] in g.objects(parser, RDF.type): - return rx.operators.map(lambda v: self.remap(parser, v.decode('ascii'))) - elif parser == ROOM['rfCode']: - return rx.operators.map(self.parseJsonRfCode) - else: - raise NotImplementedError(parser) - - def parseJsonBrightness(self, mqttValue: bytes): - msg = json.loads(mqttValue.decode('ascii')) - return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0) - - def parseJsonRfCode(self, mqttValue: bytes): - msg = json.loads(mqttValue.decode('ascii')) - return Literal('%08x%08x' % (msg['code0'], msg['code1'])) - - def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]: - g = self.config - if conv == ROOM['celsiusToFarenheit']: - - def c2f(value: Literal) -> Node: - return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2)) - - return rx.operators.map(c2f) - elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None: - threshold = g.value(conv, ROOM['ignoreValueBelow']) - return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython()) - elif conv == ROOM['buttonPress']: - loop = rx.scheduler.eventloop.TwistedScheduler(reactor) - return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop) - else: - raise NotImplementedError(conv) - - def makeQuads(self, parsed, plan): - g = self.config - - def quadsFromValue(valueNode): - return set([(self.uri, g.value(plan, ROOM['outputPredicate']), valueNode, self.uri)]) - - def emptyQuads(element): - return set([]) - - quads = rx.operators.map(quadsFromValue)(parsed) - - dur = g.value(plan, ROOM['statementLifetime']) - if dur is not None: - sec = parseDurationLiteral(dur) - loop = rx.scheduler.eventloop.TwistedScheduler(reactor) - quads = quads.pipe( - rx.operators.debounce(sec, loop), - rx.operators.map(emptyQuads), - rx.operators.merge(quads), - ) - - return quads - - def updateQuads(self, newGraphs): + def updateMasterGraph(self, newGraphs): newQuads = set.union(*newGraphs) g = graphFromQuads(newQuads) log.debug(f'{self.uri} update to {len(newQuads)} statements') @@ -199,19 +287,7 @@ collectors[metric].labels(**tags).set(val) self.masterGraph.patchSubgraph(self.uri, g) - - def tagIdToUri(self, value: bytearray) -> URIRef: - justHex = value.decode('ascii').replace('-', '').lower() - int(justHex, 16) # validate - return URIRef(f'http://bigasterisk.com/rfidCard/{justHex}') - - def remap(self, parser, valueStr: str): - g = self.config - value = Literal(valueStr) - for entry in g.objects(parser, ROOM['map']): - if value == g.value(entry, ROOM['from']): - return g.value(entry, ROOM['to']) - raise KeyError(value) + self.debugSub['currentOutputGraph']['n3'] = cast(bytes, self.masterGraph.serialize(format='n3')).decode('utf8') class Metrics(cyclone.web.RequestHandler): @@ -232,8 +308,8 @@ dpd = self.settings.debugPageData js = json.dumps(dpd, sort_keys=True) if js != self.lastSent: - print('sending dpd update') - self.sendEvent(message=js) + log.debug('sending dpd update') + self.sendEvent(message=js.encode('utf8')) self.lastSent = js except Exception: import traceback @@ -255,6 +331,8 @@ --cs=STR Only process config filenames with this substring """) verboseLogging(arg['-v']) + logging.getLogger('mqtt').setLevel(logging.INFO) + logging.getLogger('mqtt_client').setLevel(logging.INFO) config = Graph() for fn in Path('.').glob('conf/*.n3'): @@ -279,10 +357,18 @@ mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort) + influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST']) + srcs = [] for src in sorted(config.subjects(RDF.type, ROOM['MqttStatementSource'])): srcs.append( - MqttStatementSource(src, config, masterGraph, mqtt=mqtt, internalMqtt=internalMqtt, debugPageData=debugPageData)) + MqttStatementSource(src, + config, + masterGraph, + mqtt=mqtt, + internalMqtt=internalMqtt, + debugPageData=debugPageData, + influxExport=influxExport)) log.info(f'set up {len(srcs)} sources') port = 10018 @@ -310,6 +396,6 @@ debugPageData=debugPageData, debug=arg['-v']), interface='::') - log.warn('serving on %s', port) + log.info('serving on %s', port) reactor.run()