Mercurial > code > home > repos > homeauto
changeset 1629:1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
author | drewp@bigasterisk.com |
---|---|
date | Sat, 11 Sep 2021 23:31:32 -0700 |
parents | 24e8cd8fcdcd |
children | b3132cd02686 |
files | service/mqtt_to_rdf/mqtt_to_rdf.py |
diffstat | 1 files changed, 59 insertions(+), 47 deletions(-) [+] |
line wrap: on
line diff
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py Sat Sep 11 23:28:37 2021 -0700 +++ b/service/mqtt_to_rdf/mqtt_to_rdf.py Sat Sep 11 23:31:32 2021 -0700 @@ -1,26 +1,27 @@ """ Subscribe to mqtt topics; generate RDF statements. """ +import glob +import json +import logging +import os + +from mqtt_message import graphFromMessage import os import time -import json -import logging +from dataclasses import dataclass from pathlib import Path from typing import Callable, Sequence, Set, Tuple, Union, cast -from cyclone.util import ObjectDict -from rdflib.graph import ConjunctiveGraph -from rx.core.typing import Mapper - -from export_to_influxdb import InfluxExporter - +import cyclone.sse import cyclone.web -import cyclone.sse +import export_to_influxdb import prometheus_client import rx import rx.operators import rx.scheduler.eventloop from docopt import docopt +from export_to_influxdb import InfluxExporter from mqtt_client import MqttClient from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph) from prometheus_client import Counter, Gauge, Histogram, Summary @@ -28,11 +29,13 @@ from prometheus_client.registry import REGISTRY from rdfdb.rdflibpatch import graphFromQuads from rdflib import RDF, XSD, Graph, Literal, Namespace, URIRef +from rdflib.graph import ConjunctiveGraph from rdflib.term import Node from rx.core import Observable +from rx.core.typing import Mapper from standardservice.logsetup import log, verboseLogging from twisted.internet import reactor, task -from dataclasses import dataclass +from inference import Inference from button_events import button_events from patch_cyclone_sse import patchCycloneSse @@ -40,11 +43,12 @@ MESSAGES_SEEN = Counter('mqtt_messages_seen', '') collectors = {} -import export_to_influxdb +patchCycloneSse() + -print(f'merge me back {export_to_influxdb}') - -patchCycloneSse() +def logGraph(debug: Callable, label: str, graph: Graph): + n3 = cast(bytes, graph.serialize(format="n3")) + debug(label + ':\n' + n3.decode('utf8')) def appendLimit(lst, elem, n=10): @@ -210,15 +214,18 @@ return node.n3().replace('http://www.w3.org/2001/XMLSchema#', 'xsd:') -def serializeWithNs(graph: ConjunctiveGraph) -> bytes: +def serializeWithNs(graph: Graph, hidePrefixes=False) -> str: graph.bind('', ROOM) - return cast(bytes, graph.serialize(format='n3')) + n3 = cast(bytes, graph.serialize(format='n3')).decode('utf8') + if hidePrefixes: + n3 = ''.join(line for line in n3.splitlines(keepends=True) if not line.strip().startswith('@prefix')) + return n3 class MqttStatementSource: def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData, - influxExport: InfluxExporter): + influxExport: InfluxExporter, inference: Inference): self.uri = uri self.config = config self.masterGraph = masterGraph @@ -226,16 +233,15 @@ self.mqtt = mqtt # deprecated self.internalMqtt = internalMqtt self.influxExport = influxExport + self.inference = inference self.mqttTopic = self.topicFromConfig(self.config) log.debug(f'new mqttTopic {self.mqttTopic}') self.debugSub = { 'topic': self.mqttTopic.decode('ascii'), - 'recentMessages': [], - 'recentParsed': [], - 'recentConversions': [], - 'currentMetrics': [], + 'recentMessageGraphs': [], + 'recentMetrics': [], 'currentOutputGraph': { 't': 1, 'n3': "(n3)" @@ -246,18 +252,20 @@ rawBytes: Observable = self.subscribeMqtt(self.mqttTopic) rawBytes.subscribe(on_next=self.countIncomingMessage) - filteredBytes = Filters(uri, config).makeOutputStream(rawBytes) - - parsedObjs = Parser(uri, config).makeOutputStream(filteredBytes) - parsedObjs.subscribe_(lambda v: appendLimit(self.debugSub['recentParsed'], {'t': truncTime(), 'n3': tightN3(v)})) + rawBytes.subscribe_(self.onMessage) - convertedObjs = Converters(uri, config).makeOutputStream(parsedObjs) - convertedObjs.subscribe_(lambda v: appendLimit(self.debugSub['recentConversions'], {'t': truncTime(), 'n3': tightN3(v)})) + def onMessage(self, raw: bytes): + g = graphFromMessage(self.mqttTopic, raw) + logGraph(log.debug, 'message graph', g) + appendLimit( + self.debugSub['recentMessageGraphs'], + { # + 't': truncTime(), + 'n3': serializeWithNs(g, hidePrefixes=True) + }) - outputQuadsSets = Rdfizer(uri, config).makeOutputStream(convertedObjs) - outputQuadsSets.subscribe_(self.updateInflux) - - outputQuadsSets.subscribe_(self.updateMasterGraph) + implied = self.inference.infer(g) + self.updateMasterGraph(implied) def topicFromConfig(self, config) -> bytes: topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) @@ -272,35 +280,30 @@ self.debugPageData['messagesSeen'] += 1 MESSAGES_SEEN.inc() - appendLimit(self.debugSub['recentMessages'], { - 't': truncTime(), - 'msg': msg.decode('ascii'), - }) - def updateInflux(self, newGraphs): for g in newGraphs: self.influxExport.exportToInflux(g) - def updateMasterGraph(self, newGraphs): - newQuads = set.union(*newGraphs) - g = graphFromQuads(newQuads) - log.debug(f'{self.uri} update to {len(newQuads)} statements') + def updateMasterGraph(self, newGraph): + log.debug(f'{self.uri} update to {len(newGraph)} statements') - for quad in newQuads: - meas = quad[0].split('/')[-1] + cg = ConjunctiveGraph() + for stmt in newGraph: + cg.add(stmt + (self.uri,)) + meas = stmt[0].split('/')[-1] if meas.startswith('airQuality'): where_prefix, type_ = meas[len('airQuality'):].split('door') where = where_prefix + 'door' metric = 'air' tags = {'loc': where.lower(), 'type': type_.lower()} - val = quad[2].toPython() + val = stmt[2].toPython() if metric not in collectors: collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys()) collectors[metric].labels(**tags).set(val) - self.masterGraph.patchSubgraph(self.uri, g) - self.debugSub['currentOutputGraph']['n3'] = serializeWithNs(g).decode('utf8') + self.masterGraph.patchSubgraph(self.uri, cg) + self.debugSub['currentOutputGraph']['n3'] = serializeWithNs(cg, hidePrefixes=True) class Metrics(cyclone.web.RequestHandler): @@ -346,6 +349,8 @@ verboseLogging(arg['-v']) logging.getLogger('mqtt').setLevel(logging.INFO) logging.getLogger('mqtt_client').setLevel(logging.INFO) + logging.getLogger('infer').setLevel(logging.INFO) + log.info('log start') config = Graph() for fn in Path('.').glob('conf/*.n3'): @@ -372,8 +377,14 @@ influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST']) + inference = Inference() + inference.setRules(config) + expandedConfig = inference.infer(config) + log.info('expanded config:') + for stmt in sorted(expandedConfig): + log.info(f' {stmt}') srcs = [] - for src in sorted(config.subjects(RDF.type, ROOM['MqttStatementSource'])): + for src in sorted(expandedConfig.subjects(RDF.type, ROOM['MqttStatementSource'])): srcs.append( MqttStatementSource(src, config, @@ -381,7 +392,8 @@ mqtt=mqtt, internalMqtt=internalMqtt, debugPageData=debugPageData, - influxExport=influxExport)) + influxExport=influxExport, + inference=inference)) log.info(f'set up {len(srcs)} sources') port = 10018