changeset 1644:9e7f571deedf

mqtt_to_rdf.py updates
author drewp@bigasterisk.com
date Fri, 17 Sep 2021 11:04:23 -0700
parents c04b5303eb08
children d3b295c28a26
files service/mqtt_to_rdf/index.html service/mqtt_to_rdf/mqtt_to_rdf.py
diffstat 2 files changed, 43 insertions(+), 31 deletions(-) [+]
line wrap: on
line diff
--- a/service/mqtt_to_rdf/index.html	Fri Sep 17 11:03:13 2021 -0700
+++ b/service/mqtt_to_rdf/index.html	Fri Sep 17 11:04:23 2021 -0700
@@ -1,12 +1,17 @@
-<!doctype html>
+<!DOCTYPE html>
 <html>
   <head>
     <title>mqtt_to_rdf</title>
-    <meta charset="utf-8">
+    <meta charset="utf-8" />
     <script type="module" src="./build/bundle.js"></script>
-    
-    <meta name="mobile-web-app-capable" content="yes">
-    <meta name="viewport" content="width=device-width, initial-scale=1">
+
+    <meta name="mobile-web-app-capable" content="yes" />
+    <meta name="viewport" content="width=device-width, initial-scale=1" />
+    <style>
+      html {
+        background: black;
+      }
+    </style>
   </head>
   <body class="rdfBrowsePage">
     <mqtt-to-rdf-page></mqtt-to-rdf-page>
@@ -21,6 +26,5 @@
         <a href="mqtt">/mqtt</a>
         <a href="mqtt/events">/mqtt/events</a>
       </div> -->
-
   </body>
 </html>
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py	Fri Sep 17 11:03:13 2021 -0700
+++ b/service/mqtt_to_rdf/mqtt_to_rdf.py	Fri Sep 17 11:04:23 2021 -0700
@@ -298,17 +298,17 @@
         cg = ConjunctiveGraph()
         for stmt in newGraph:
             cg.add(stmt + (self.uri,))
-            meas = stmt[0].split('/')[-1]
-            if meas.startswith('airQuality'):
-                where_prefix, type_ = meas[len('airQuality'):].split('door')
-                where = where_prefix + 'door'
-                metric = 'air'
-                tags = {'loc': where.lower(), 'type': type_.lower()}
-                val = stmt[2].toPython()
-                if metric not in collectors:
-                    collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys())
+            # meas = stmt[0].split('/')[-1]
+            # if meas.startswith('airQuality'):
+            #     where_prefix, type_ = meas[len('airQuality'):].split('door')
+            #     where = where_prefix + 'door'
+            #     metric = 'air'
+            #     tags = {'loc': where.lower(), 'type': type_.lower()}
+            #     val = stmt[2].toPython()
+            #     if metric not in collectors:
+            #         collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys())
 
-                collectors[metric].labels(**tags).set(val)
+            #     collectors[metric].labels(**tags).set(val)
 
         self.masterGraph.patchSubgraph(self.uri, cg)
         self.debugSub['currentOutputGraph']['n3'] = serializeWithNs(cg, hidePrefixes=True)
@@ -380,28 +380,35 @@
         'subscribed': [],
     }
 
+    inference = Inference()
+    inference.setRules(config)
+    expandedConfig = inference.infer(config)
+    expandedConfig += inference.nonRuleStatements()
+    log.info('expanded config:')
+    for stmt in sorted(expandedConfig):
+        log.info(f'  {stmt}')
+
     mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883)  # deprecated
     internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort)
 
     influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST'])
 
-    inference = Inference()
-    inference.setRules(config)
-    expandedConfig = inference.infer(config)
-    log.info('expanded config:')
-    for stmt in sorted(expandedConfig):
-        log.info(f'  {stmt}')
+    # this needs to be part of the config reload. Maybe GraphFile patch output would be better?
     srcs = []
     for src in sorted(expandedConfig.subjects(RDF.type, ROOM['MqttStatementSource'])):
-        srcs.append(
-            MqttStatementSource(src,
-                                config,
-                                masterGraph,
-                                mqtt=mqtt,
-                                internalMqtt=internalMqtt,
-                                debugPageData=debugPageData,
-                                influxExport=influxExport,
-                                inference=inference))
+        log.info(f'setup source {src=}')
+        try:
+            srcs.append(
+                MqttStatementSource(src,
+                                    expandedConfig,
+                                    masterGraph,
+                                    mqtt=mqtt,
+                                    internalMqtt=internalMqtt,
+                                    debugPageData=debugPageData,
+                                    influxExport=influxExport,
+                                    inference=inference))
+        except EmptyTopicError:
+            continue
     log.info(f'set up {len(srcs)} sources')
 
     peg = PatchableGraph()
@@ -431,6 +438,7 @@
                       ],
                                               mqtt=mqtt,
                                               internalMqtt=internalMqtt,
+                                              expandedConfig=expandedConfig,
                                               masterGraph=masterGraph,
                                               debugPageData=debugPageData,
                                               debug=arg['-v']),