changeset 1629:1c36ad1eb8b3

do inference on config. backend for new ui columns. rm some of the old filter pipeline
author drewp@bigasterisk.com
date Sat, 11 Sep 2021 23:31:32 -0700
parents 24e8cd8fcdcd
children b3132cd02686
files service/mqtt_to_rdf/mqtt_to_rdf.py
diffstat 1 files changed, 59 insertions(+), 47 deletions(-) [+]
line wrap: on
line diff
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py	Sat Sep 11 23:28:37 2021 -0700
+++ b/service/mqtt_to_rdf/mqtt_to_rdf.py	Sat Sep 11 23:31:32 2021 -0700
@@ -1,26 +1,27 @@
 """
 Subscribe to mqtt topics; generate RDF statements.
 """
+import glob
+import json
+import logging
+import os
+
+from mqtt_message import graphFromMessage
 import os
 import time
-import json
-import logging
+from dataclasses import dataclass
 from pathlib import Path
 from typing import Callable, Sequence, Set, Tuple, Union, cast
-from cyclone.util import ObjectDict
-from rdflib.graph import ConjunctiveGraph
 
-from rx.core.typing import Mapper
-
-from export_to_influxdb import InfluxExporter
-
+import cyclone.sse
 import cyclone.web
-import cyclone.sse
+import export_to_influxdb
 import prometheus_client
 import rx
 import rx.operators
 import rx.scheduler.eventloop
 from docopt import docopt
+from export_to_influxdb import InfluxExporter
 from mqtt_client import MqttClient
 from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph)
 from prometheus_client import Counter, Gauge, Histogram, Summary
@@ -28,11 +29,13 @@
 from prometheus_client.registry import REGISTRY
 from rdfdb.rdflibpatch import graphFromQuads
 from rdflib import RDF, XSD, Graph, Literal, Namespace, URIRef
+from rdflib.graph import ConjunctiveGraph
 from rdflib.term import Node
 from rx.core import Observable
+from rx.core.typing import Mapper
 from standardservice.logsetup import log, verboseLogging
 from twisted.internet import reactor, task
-from dataclasses import dataclass
+from inference import Inference
 from button_events import button_events
 from patch_cyclone_sse import patchCycloneSse
 
@@ -40,11 +43,12 @@
 MESSAGES_SEEN = Counter('mqtt_messages_seen', '')
 collectors = {}
 
-import export_to_influxdb
+patchCycloneSse()
+
 
-print(f'merge me back {export_to_influxdb}')
-
-patchCycloneSse()
+def logGraph(debug: Callable, label: str, graph: Graph):
+    n3 = cast(bytes, graph.serialize(format="n3"))
+    debug(label + ':\n' + n3.decode('utf8'))
 
 
 def appendLimit(lst, elem, n=10):
@@ -210,15 +214,18 @@
     return node.n3().replace('http://www.w3.org/2001/XMLSchema#', 'xsd:')
 
 
-def serializeWithNs(graph: ConjunctiveGraph) -> bytes:
+def serializeWithNs(graph: Graph, hidePrefixes=False) -> str:
     graph.bind('', ROOM)
-    return cast(bytes, graph.serialize(format='n3'))
+    n3 = cast(bytes, graph.serialize(format='n3')).decode('utf8')
+    if hidePrefixes:
+        n3 = ''.join(line for line in n3.splitlines(keepends=True) if not line.strip().startswith('@prefix'))
+    return n3
 
 
 class MqttStatementSource:
 
     def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData,
-                 influxExport: InfluxExporter):
+                 influxExport: InfluxExporter, inference: Inference):
         self.uri = uri
         self.config = config
         self.masterGraph = masterGraph
@@ -226,16 +233,15 @@
         self.mqtt = mqtt  # deprecated
         self.internalMqtt = internalMqtt
         self.influxExport = influxExport
+        self.inference = inference
 
         self.mqttTopic = self.topicFromConfig(self.config)
         log.debug(f'new mqttTopic {self.mqttTopic}')
 
         self.debugSub = {
             'topic': self.mqttTopic.decode('ascii'),
-            'recentMessages': [],
-            'recentParsed': [],
-            'recentConversions': [],
-            'currentMetrics': [],
+            'recentMessageGraphs': [],
+            'recentMetrics': [],
             'currentOutputGraph': {
                 't': 1,
                 'n3': "(n3)"
@@ -246,18 +252,20 @@
         rawBytes: Observable = self.subscribeMqtt(self.mqttTopic)
         rawBytes.subscribe(on_next=self.countIncomingMessage)
 
-        filteredBytes = Filters(uri, config).makeOutputStream(rawBytes)
-
-        parsedObjs = Parser(uri, config).makeOutputStream(filteredBytes)
-        parsedObjs.subscribe_(lambda v: appendLimit(self.debugSub['recentParsed'], {'t': truncTime(), 'n3': tightN3(v)}))
+        rawBytes.subscribe_(self.onMessage)
 
-        convertedObjs = Converters(uri, config).makeOutputStream(parsedObjs)
-        convertedObjs.subscribe_(lambda v: appendLimit(self.debugSub['recentConversions'], {'t': truncTime(), 'n3': tightN3(v)}))
+    def onMessage(self, raw: bytes):
+        g = graphFromMessage(self.mqttTopic, raw)
+        logGraph(log.debug, 'message graph', g)
+        appendLimit(
+            self.debugSub['recentMessageGraphs'],
+            {  #
+                't': truncTime(),
+                'n3': serializeWithNs(g, hidePrefixes=True)
+            })
 
-        outputQuadsSets = Rdfizer(uri, config).makeOutputStream(convertedObjs)
-        outputQuadsSets.subscribe_(self.updateInflux)
-
-        outputQuadsSets.subscribe_(self.updateMasterGraph)
+        implied = self.inference.infer(g)
+        self.updateMasterGraph(implied)
 
     def topicFromConfig(self, config) -> bytes:
         topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic'])))
@@ -272,35 +280,30 @@
         self.debugPageData['messagesSeen'] += 1
         MESSAGES_SEEN.inc()
 
-        appendLimit(self.debugSub['recentMessages'], {
-            't': truncTime(),
-            'msg': msg.decode('ascii'),
-        })
-
     def updateInflux(self, newGraphs):
         for g in newGraphs:
             self.influxExport.exportToInflux(g)
 
-    def updateMasterGraph(self, newGraphs):
-        newQuads = set.union(*newGraphs)
-        g = graphFromQuads(newQuads)
-        log.debug(f'{self.uri} update to {len(newQuads)} statements')
+    def updateMasterGraph(self, newGraph):
+        log.debug(f'{self.uri} update to {len(newGraph)} statements')
 
-        for quad in newQuads:
-            meas = quad[0].split('/')[-1]
+        cg = ConjunctiveGraph()
+        for stmt in newGraph:
+            cg.add(stmt + (self.uri,))
+            meas = stmt[0].split('/')[-1]
             if meas.startswith('airQuality'):
                 where_prefix, type_ = meas[len('airQuality'):].split('door')
                 where = where_prefix + 'door'
                 metric = 'air'
                 tags = {'loc': where.lower(), 'type': type_.lower()}
-                val = quad[2].toPython()
+                val = stmt[2].toPython()
                 if metric not in collectors:
                     collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys())
 
                 collectors[metric].labels(**tags).set(val)
 
-        self.masterGraph.patchSubgraph(self.uri, g)
-        self.debugSub['currentOutputGraph']['n3'] = serializeWithNs(g).decode('utf8')
+        self.masterGraph.patchSubgraph(self.uri, cg)
+        self.debugSub['currentOutputGraph']['n3'] = serializeWithNs(cg, hidePrefixes=True)
 
 
 class Metrics(cyclone.web.RequestHandler):
@@ -346,6 +349,8 @@
     verboseLogging(arg['-v'])
     logging.getLogger('mqtt').setLevel(logging.INFO)
     logging.getLogger('mqtt_client').setLevel(logging.INFO)
+    logging.getLogger('infer').setLevel(logging.INFO)
+    log.info('log start')
 
     config = Graph()
     for fn in Path('.').glob('conf/*.n3'):
@@ -372,8 +377,14 @@
 
     influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST'])
 
+    inference = Inference()
+    inference.setRules(config)
+    expandedConfig = inference.infer(config)
+    log.info('expanded config:')
+    for stmt in sorted(expandedConfig):
+        log.info(f'  {stmt}')
     srcs = []
-    for src in sorted(config.subjects(RDF.type, ROOM['MqttStatementSource'])):
+    for src in sorted(expandedConfig.subjects(RDF.type, ROOM['MqttStatementSource'])):
         srcs.append(
             MqttStatementSource(src,
                                 config,
@@ -381,7 +392,8 @@
                                 mqtt=mqtt,
                                 internalMqtt=internalMqtt,
                                 debugPageData=debugPageData,
-                                influxExport=influxExport))
+                                influxExport=influxExport,
+                                inference=inference))
     log.info(f'set up {len(srcs)} sources')
 
     port = 10018