# HG changeset patch
# User drewp@bigasterisk.com
# Date 1631901863 25200
# Node ID 9e7f571deedfcb7932a0d70535663def256d559d
# Parent c04b5303eb081ae12a4693fa03eb6101d616fe75
mqtt_to_rdf.py updates
diff -r c04b5303eb08 -r 9e7f571deedf service/mqtt_to_rdf/index.html
--- a/service/mqtt_to_rdf/index.html Fri Sep 17 11:03:13 2021 -0700
+++ b/service/mqtt_to_rdf/index.html Fri Sep 17 11:04:23 2021 -0700
@@ -1,12 +1,17 @@
-
+
mqtt_to_rdf
-
+
-
-
-
+
+
+
+
@@ -21,6 +26,5 @@
/mqtt
/mqtt/events
-->
-
diff -r c04b5303eb08 -r 9e7f571deedf service/mqtt_to_rdf/mqtt_to_rdf.py
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py Fri Sep 17 11:03:13 2021 -0700
+++ b/service/mqtt_to_rdf/mqtt_to_rdf.py Fri Sep 17 11:04:23 2021 -0700
@@ -298,17 +298,17 @@
cg = ConjunctiveGraph()
for stmt in newGraph:
cg.add(stmt + (self.uri,))
- meas = stmt[0].split('/')[-1]
- if meas.startswith('airQuality'):
- where_prefix, type_ = meas[len('airQuality'):].split('door')
- where = where_prefix + 'door'
- metric = 'air'
- tags = {'loc': where.lower(), 'type': type_.lower()}
- val = stmt[2].toPython()
- if metric not in collectors:
- collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys())
+ # meas = stmt[0].split('/')[-1]
+ # if meas.startswith('airQuality'):
+ # where_prefix, type_ = meas[len('airQuality'):].split('door')
+ # where = where_prefix + 'door'
+ # metric = 'air'
+ # tags = {'loc': where.lower(), 'type': type_.lower()}
+ # val = stmt[2].toPython()
+ # if metric not in collectors:
+ # collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys())
- collectors[metric].labels(**tags).set(val)
+ # collectors[metric].labels(**tags).set(val)
self.masterGraph.patchSubgraph(self.uri, cg)
self.debugSub['currentOutputGraph']['n3'] = serializeWithNs(cg, hidePrefixes=True)
@@ -380,28 +380,35 @@
'subscribed': [],
}
+ inference = Inference()
+ inference.setRules(config)
+ expandedConfig = inference.infer(config)
+ expandedConfig += inference.nonRuleStatements()
+ log.info('expanded config:')
+ for stmt in sorted(expandedConfig):
+ log.info(f' {stmt}')
+
mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated
internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort)
influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST'])
- inference = Inference()
- inference.setRules(config)
- expandedConfig = inference.infer(config)
- log.info('expanded config:')
- for stmt in sorted(expandedConfig):
- log.info(f' {stmt}')
+ # this needs to be part of the config reload. Maybe GraphFile patch output would be better?
srcs = []
for src in sorted(expandedConfig.subjects(RDF.type, ROOM['MqttStatementSource'])):
- srcs.append(
- MqttStatementSource(src,
- config,
- masterGraph,
- mqtt=mqtt,
- internalMqtt=internalMqtt,
- debugPageData=debugPageData,
- influxExport=influxExport,
- inference=inference))
+ log.info(f'setup source {src=}')
+ try:
+ srcs.append(
+ MqttStatementSource(src,
+ expandedConfig,
+ masterGraph,
+ mqtt=mqtt,
+ internalMqtt=internalMqtt,
+ debugPageData=debugPageData,
+ influxExport=influxExport,
+ inference=inference))
+ except EmptyTopicError:
+ continue
log.info(f'set up {len(srcs)} sources')
peg = PatchableGraph()
@@ -431,6 +438,7 @@
],
mqtt=mqtt,
internalMqtt=internalMqtt,
+ expandedConfig=expandedConfig,
masterGraph=masterGraph,
debugPageData=debugPageData,
debug=arg['-v']),