changeset 1706:2085ed9cfcc4

reworking UI to reflect the new inferencing code
author drewp@bigasterisk.com
date Sat, 23 Oct 2021 13:22:40 -0700
parents 250f4c27d56f
children 4a6fffe6a994
files service/mqtt_to_rdf/Dockerfile service/mqtt_to_rdf/mqtt_message.py service/mqtt_to_rdf/mqtt_to_rdf.py service/mqtt_to_rdf/src/index.ts
diffstat 4 files changed, 120 insertions(+), 114 deletions(-) [+]
line wrap: on
line diff
--- a/service/mqtt_to_rdf/Dockerfile	Sat Oct 23 13:21:06 2021 -0700
+++ b/service/mqtt_to_rdf/Dockerfile	Sat Oct 23 13:22:40 2021 -0700
@@ -14,6 +14,6 @@
 COPY src/ ./src
 RUN pnpm build
 
-COPY *.py *.html ./
+COPY *.py index.html ./
 
 CMD [ "python3", "./mqtt_to_rdf.py", "-v", "--cs=rules" ]
--- a/service/mqtt_to_rdf/mqtt_message.py	Sat Oct 23 13:21:06 2021 -0700
+++ b/service/mqtt_to_rdf/mqtt_message.py	Sat Oct 23 13:22:40 2021 -0700
@@ -8,10 +8,12 @@
 JSON = Namespace('http://bigasterisk.com/anyJson/')
 
 
-def graphFromMessage(topic: bytes, body: bytes):
+def graphFromMessage(topicUri: URIRef, topic: bytes, body: bytes):
     graph = Graph()
     message = URIRef(f'{uuid.uuid1().urn}')
 
+    graph.add((topicUri, ROOM['message'], message))
+
     graph.add((message, RDF.type, ROOM['MqttMessage']))
 
     topicSegments = BNode()
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py	Sat Oct 23 13:21:06 2021 -0700
+++ b/service/mqtt_to_rdf/mqtt_to_rdf.py	Sat Oct 23 13:22:40 2021 -0700
@@ -13,7 +13,7 @@
 import time
 from dataclasses import dataclass
 from pathlib import Path
-from typing import Callable, Sequence, Set, Tuple, Union, cast
+from typing import Callable, Set, Tuple, Union, cast
 
 import cyclone.sse
 import cyclone.web
@@ -29,11 +29,10 @@
 from prometheus_client import Counter, Gauge, Histogram, Summary
 from prometheus_client.exposition import generate_latest
 from prometheus_client.registry import REGISTRY
-from rdfdb.rdflibpatch import graphFromQuads
 from rdflib import RDF, XSD, Graph, Literal, Namespace, URIRef
 from rdflib.graph import ConjunctiveGraph
 from rdflib.term import Node
-from rx.core import Observable
+from rx.core.observable.observable import Observable
 from rx.core.typing import Mapper
 from standardservice.logsetup import log, verboseLogging
 from twisted.internet import reactor, task
@@ -230,18 +229,19 @@
 
 class MqttStatementSource:
 
-    def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData,
-                 influxExport: InfluxExporter, inference: Inference):
+    def __init__(self, uri: URIRef, topic: bytes, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData, 
+    # influxExport: InfluxExporter,
+                 inference: Inference):
         self.uri = uri
-        self.config = config
+
         self.masterGraph = masterGraph
         self.debugPageData = debugPageData
         self.mqtt = mqtt  # deprecated
         self.internalMqtt = internalMqtt
-        self.influxExport = influxExport
+        # self.influxExport = influxExport
         self.inference = inference
 
-        self.mqttTopic = self.topicFromConfig(self.config)
+        self.mqttTopic = topic
         if self.mqttTopic == b'':
             raise EmptyTopicError(f"empty topic for {uri=}")
         log.debug(f'new mqttTopic {self.mqttTopic}')
@@ -263,7 +263,7 @@
         rawBytes.subscribe_(self.onMessage)
 
     def onMessage(self, raw: bytes):
-        g = graphFromMessage(self.mqttTopic, raw)
+        g = graphFromMessage(self.uri, self.mqttTopic, raw)
         logGraph(log.debug, 'message graph', g)
         appendLimit(
             self.debugSub['recentMessageGraphs'],
@@ -275,10 +275,6 @@
         implied = self.inference.infer(g)
         self.updateMasterGraph(implied)
 
-    def topicFromConfig(self, config) -> bytes:
-        topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic'])))
-        return b'/'.join(t.encode('ascii') for t in topicParts)
-
     def subscribeMqtt(self, topic: bytes):
         # goal is to get everyone on the internal broker and eliminate this
         mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt
@@ -347,31 +343,70 @@
         self.loop.stop()
 
 
-@dataclass
-class WatchFiles:
-    # could be merged with rdfdb.service and GraphFile code
-    globPattern: str
-    outGraph: Graph
+# @dataclass
+# class WatchFiles:
+#     # could be merged with rdfdb.service and GraphFile code
+#     globPattern: str
+#     outGraph: Graph
+
+#     def __post_init__(self):
+#         self.lastUpdate = 0
+#         task.LoopingCall(self.refresh).start(1)
+#         log.info(f'start watching {self.globPattern}')
 
-    def __post_init__(self):
-        self.lastUpdate = 0
-        task.LoopingCall(self.refresh).start(1)
-        log.info(f'start watching {self.globPattern}')
+#     def refresh(self):
+#         files = glob.glob(self.globPattern)
+#         for fn in files:
+#             if os.path.getmtime(fn) > self.lastUpdate:
+#                 break
+#         else:
+#             return
+#         self.lastUpdate = time.time()
+#         self.outGraph.remove((None, None, None))
+#         log.info('reread config')
+#         for fn in files:
+#             # todo: handle read errors
+#             self.outGraph.parse(fn, format='n3')
+#         # and notify this change,so we can recalc the latest output
+
 
-    def refresh(self):
-        files = glob.glob(self.globPattern)
-        for fn in files:
-            if os.path.getmtime(fn) > self.lastUpdate:
-                break
-        else:
-            return
-        self.lastUpdate = time.time()
-        self.outGraph.remove((None, None, None))
-        log.info('reread config')
-        for fn in files:
-            # todo: handle read errors
-            self.outGraph.parse(fn, format='n3')
-        # and notify this change,so we can recalc the latest output
+class RunState:
+    """this is rebuilt upon every config reload"""
+    def __init__(self,
+                 expandedConfigPatchableCopy: PatchableGraph,  # for output and display
+                 masterGraph: PatchableGraph,  # current sensor outputs
+                 mqtt: MqttClient,
+                 internalMqtt: MqttClient,
+                 #  influxExport: InfluxExporter,
+                 inference: Inference):
+        loadedConfig = ConjunctiveGraph()
+        loadedConfig.parse('conf/rules.n3', format='n3')
+
+        inference.setRules(loadedConfig)
+        self.expandedConfig = inference.infer(loadedConfig)
+        self.expandedConfig += inference.nonRuleStatements()
+
+        ecWithQuads = ConjunctiveGraph()
+        for s, p, o in self.expandedConfig:
+            ecWithQuads.add((s, p, o, URIRef('/config')))
+        expandedConfigPatchableCopy.setToGraph(ecWithQuads)
+
+        self.srcs = []
+        for src in sorted(self.expandedConfig.subjects(RDF.type, ROOM['MqttStatementSource'])):
+            log.info(f'setup source {src=}')
+            try:
+                self.srcs.append(
+                    MqttStatementSource(src, self.topicFromConfig(self.expandedConfig, src),
+                                        masterGraph, mqtt=mqtt, internalMqtt=internalMqtt, debugPageData=debugPageData,
+                                        # influxExport=influxExport, 
+                                        inference=inference))
+            except EmptyTopicError:
+                continue
+        log.info(f'set up {len(self.srcs)} sources')
+
+    def topicFromConfig(self, config, src) -> bytes:
+        topicParts = list(config.items(config.value(src, ROOM['mqttTopic'])))
+        return b'/'.join(t.encode('ascii') for t in topicParts)
 
 
 if __name__ == '__main__':
@@ -389,87 +424,47 @@
     # log.setLevel(logging.DEBUG)
     log.info('log start')
 
-    config = ConjunctiveGraph()
-    watcher = WatchFiles('conf/rules.n3', config)
-    # for fn in Path('.').glob('conf/*.n3'):
-    #     if not arg['--cs'] or str(arg['--cs']) in str(fn):
-    #         log.debug(f'loading {fn}')
-    #         config.parse(str(fn), format='n3')
-    #     else:
-    #         log.debug(f'skipping {fn}')
-
+    # config = ConjunctiveGraph()
+    # watcher = WatchFiles('conf/rules.n3', config)
     masterGraph = PatchableGraph()
+    inference = Inference()
 
     brokerHost = 'mosquitto-frontdoor.default.svc.cluster.local'
     brokerPort = 10210
 
+    mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883)  # deprecated
+    internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort)
+
+    # needs rework since the config can change:
+    # influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST'])
+
     debugPageData = {
         # schema in index.ts
         'server': f'{brokerHost}:{brokerPort}',
         'messagesSeen': 0,
         'subscribed': [],
+        "rules": "",
+        "rulesInferred": "",
     }
 
-    inference = Inference()
-    inference.setRules(config)
-    expandedConfig = inference.infer(config)
-    expandedConfig += inference.nonRuleStatements()
-    log.info('expanded config:')
-    for stmt in sorted(expandedConfig):
-        log.info(f'  {stmt}')
-
-    mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883)  # deprecated
-    internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort)
-
-    influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST'])
+    expandedConfigPatchableCopy = PatchableGraph()
 
-    # this needs to be part of the config reload. Maybe GraphFile patch output would be better?
-    srcs = []
-    for src in sorted(expandedConfig.subjects(RDF.type, ROOM['MqttStatementSource'])):
-        log.info(f'setup source {src=}')
-        try:
-            srcs.append(
-                MqttStatementSource(src,
-                                    expandedConfig,
-                                    masterGraph,
-                                    mqtt=mqtt,
-                                    internalMqtt=internalMqtt,
-                                    debugPageData=debugPageData,
-                                    influxExport=influxExport,
-                                    inference=inference))
-        except EmptyTopicError:
-            continue
-    log.info(f'set up {len(srcs)} sources')
-
-    peg = PatchableGraph()
-    peg.patch(Patch(addQuads=[(s,p,o,URIRef('/config')) for s,p,o in expandedConfig]))
+    runState = RunState(expandedConfigPatchableCopy, masterGraph, mqtt, internalMqtt, inference)
 
     port = 10018
     reactor.listenTCP(port,
                       cyclone.web.Application([
-                          (r"/()", cyclone.web.StaticFileHandler, {
-                              "path": ".",
-                              "default_filename": "index.html"
-                          }),
-                          (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, {
-                              "path": "build"
-                          }),
-                          (r"/graph/config", CycloneGraphHandler, {
-                              'masterGraph': peg,
-                          }),
-                          (r"/graph/mqtt", CycloneGraphHandler, {
-                              'masterGraph': masterGraph
-                          }),
-                          (r"/graph/mqtt/events", CycloneGraphEventsHandler, {
-                              'masterGraph': masterGraph
-                          }),
+                          (r"/()", cyclone.web.StaticFileHandler, {"path": ".", "default_filename": "index.html"}),
+                          (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, {"path": "build"}),
+                          (r"/graph/config", CycloneGraphHandler, {'masterGraph': expandedConfigPatchableCopy}),
+                          (r"/graph/mqtt", CycloneGraphHandler, {'masterGraph': masterGraph}),
+                          (r"/graph/mqtt/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}),
                           (r'/debugPageData', DebugPageData),
                           (r'/metrics', Metrics),
                       ],
-                                              mqtt=mqtt,
-                                              internalMqtt=internalMqtt,
-                                              expandedConfig=expandedConfig,
-                                              masterGraph=masterGraph,
+                                            #   mqtt=mqtt,
+                                            #   internalMqtt=internalMqtt,
+                                            #   masterGraph=masterGraph,
                                               debugPageData=debugPageData,
                                               debug=arg['-v']),
                       interface='::')
--- a/service/mqtt_to_rdf/src/index.ts	Sat Oct 23 13:21:06 2021 -0700
+++ b/service/mqtt_to_rdf/src/index.ts	Sat Oct 23 13:22:40 2021 -0700
@@ -40,6 +40,8 @@
   server: string;
   messagesSeen: number;
   subscribed: Subscribed[];
+  rules: string;
+  rulesInferred: string;
 }
 
 @customElement("mqtt-to-rdf-page")
@@ -80,43 +82,50 @@
         currentOutputGraph: { t: 1, n3: "(n3)" },
       },
     ],
+    rules: "",
+    rulesInferred: "",
   };
 
   render() {
     const d = this.pageData;
     const now = Date.now() / 1000;
     const ago = (t: number) => html`${Math.round(now - t)}s ago`;
-    const topicItem = (t: Subscribed, index: number) =>
-      html`<div class="topic" style="grid-column: 1; grid-row: ${index + 2}">
-        <span class="topic">${t.topic} sticky this to graph column</span>
-      </div>`;
-
+    
     const parsedGraphAtTime = (g: GraphAtTime) => html` <div class="graph">graph: ${g.n3}</div> `;
     const recentMessageGraphs = (t: Subscribed, index: number) =>
-      html` <div style="grid-column: 2; grid-row: ${index + 2}">${t.recentMessageGraphs.map(parsedGraphAtTime)}</div> `;
+      html` <div style="grid-column: 1; grid-row: ${index + 2}">
+      <div>Topic: <span class="topic">${t.topic}</span></div>
+      ${t.recentMessageGraphs.map(parsedGraphAtTime)}
+      </div> `;
 
     const metric = (m: Metric) => html`<div>metrix ${m.name} ${JSON.stringify(m.labels)} = ${m.value}</div>`;
     const outputMetrics = (t: Subscribed, index: number) => html` <div style="grid-column: 3; grid-row: ${index + 2}">${t.recentMetrics.map(metric)}</div> `;
     const outputGraph = (t: Subscribed, index: number) =>
-      html` <div style="grid-column: 4; grid-row: ${index + 2}">${parsedGraphAtTime(t.currentOutputGraph)}</div> `;
+      html` <div style="grid-column: 2; grid-row: ${index + 2}">${parsedGraphAtTime(t.currentOutputGraph)}</div> `;
     return html`
       <h1>mqtt_to_rdf</h1>
 
       <section>connected to ${d.server}; messages received ${d.messagesSeen}</section>
 
       <div class="grid">
-        <div class="hd" style="grid-row: 1; grid-column: 1">subscribed topics</div>
-        ${d.subscribed.map(topicItem)}
-
-        <div class="hd" style="grid-row: 1; grid-column: 2">recent incoming messages</div>
+        <div class="hd" style="grid-row: 1; grid-column: 1">recent incoming messages, by topic</div>
         ${d.subscribed.map(recentMessageGraphs)}
 
-        <div class="hd" style="grid-row: 1; grid-column: 3">output metrics: prom collection according to converted graph</div>
+        <div class="hd" style="grid-row: 1; grid-column: 2">output subgraph</div>
+        ${d.subscribed.map(outputGraph)}
+        
+        <div class="hd" style="grid-row: 1; grid-column: 3">output metrics: <br>prom collection according to<br> converted graph</div>
         ${d.subscribed.map(outputMetrics)}
+      </div>
 
-        <div class="hd" style="grid-row: 1; grid-column: 4">output subgraph</div>
-        ${d.subscribed.map(outputGraph)}
-      </section>
+      <table>
+        <tr>
+          <th>Rules input</th>
+          <td><textarea>${this.pageData.rules}</textarea></td>
+          <th>Rules inferred</tH>
+          <td><pre>${this.pageData.rulesInferred}</pre></td>
+        </tr>
+      </table>
     `;
   }
 }