diff service/mqtt_to_rdf/mqtt_to_rdf.py @ 1644:9e7f571deedf

mqtt_to_rdf.py updates
author drewp@bigasterisk.com
date Fri, 17 Sep 2021 11:04:23 -0700
parents c04b5303eb08
children 34eb87f68ab8
line wrap: on
line diff
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py	Fri Sep 17 11:03:13 2021 -0700
+++ b/service/mqtt_to_rdf/mqtt_to_rdf.py	Fri Sep 17 11:04:23 2021 -0700
@@ -298,17 +298,17 @@
         cg = ConjunctiveGraph()
         for stmt in newGraph:
             cg.add(stmt + (self.uri,))
-            meas = stmt[0].split('/')[-1]
-            if meas.startswith('airQuality'):
-                where_prefix, type_ = meas[len('airQuality'):].split('door')
-                where = where_prefix + 'door'
-                metric = 'air'
-                tags = {'loc': where.lower(), 'type': type_.lower()}
-                val = stmt[2].toPython()
-                if metric not in collectors:
-                    collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys())
+            # meas = stmt[0].split('/')[-1]
+            # if meas.startswith('airQuality'):
+            #     where_prefix, type_ = meas[len('airQuality'):].split('door')
+            #     where = where_prefix + 'door'
+            #     metric = 'air'
+            #     tags = {'loc': where.lower(), 'type': type_.lower()}
+            #     val = stmt[2].toPython()
+            #     if metric not in collectors:
+            #         collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys())
 
-                collectors[metric].labels(**tags).set(val)
+            #     collectors[metric].labels(**tags).set(val)
 
         self.masterGraph.patchSubgraph(self.uri, cg)
         self.debugSub['currentOutputGraph']['n3'] = serializeWithNs(cg, hidePrefixes=True)
@@ -380,28 +380,35 @@
         'subscribed': [],
     }
 
+    inference = Inference()
+    inference.setRules(config)
+    expandedConfig = inference.infer(config)
+    expandedConfig += inference.nonRuleStatements()
+    log.info('expanded config:')
+    for stmt in sorted(expandedConfig):
+        log.info(f'  {stmt}')
+
     mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883)  # deprecated
     internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort)
 
     influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST'])
 
-    inference = Inference()
-    inference.setRules(config)
-    expandedConfig = inference.infer(config)
-    log.info('expanded config:')
-    for stmt in sorted(expandedConfig):
-        log.info(f'  {stmt}')
+    # this needs to be part of the config reload. Maybe GraphFile patch output would be better?
     srcs = []
     for src in sorted(expandedConfig.subjects(RDF.type, ROOM['MqttStatementSource'])):
-        srcs.append(
-            MqttStatementSource(src,
-                                config,
-                                masterGraph,
-                                mqtt=mqtt,
-                                internalMqtt=internalMqtt,
-                                debugPageData=debugPageData,
-                                influxExport=influxExport,
-                                inference=inference))
+        log.info(f'setup source {src=}')
+        try:
+            srcs.append(
+                MqttStatementSource(src,
+                                    expandedConfig,
+                                    masterGraph,
+                                    mqtt=mqtt,
+                                    internalMqtt=internalMqtt,
+                                    debugPageData=debugPageData,
+                                    influxExport=influxExport,
+                                    inference=inference))
+        except EmptyTopicError:
+            continue
     log.info(f'set up {len(srcs)} sources')
 
     peg = PatchableGraph()
@@ -431,6 +438,7 @@
                       ],
                                               mqtt=mqtt,
                                               internalMqtt=internalMqtt,
+                                              expandedConfig=expandedConfig,
                                               masterGraph=masterGraph,
                                               debugPageData=debugPageData,
                                               debug=arg['-v']),