# HG changeset patch # User drewp@bigasterisk.com # Date 1631901863 25200 # Node ID 9e7f571deedfcb7932a0d70535663def256d559d # Parent c04b5303eb081ae12a4693fa03eb6101d616fe75 mqtt_to_rdf.py updates diff -r c04b5303eb08 -r 9e7f571deedf service/mqtt_to_rdf/index.html --- a/service/mqtt_to_rdf/index.html Fri Sep 17 11:03:13 2021 -0700 +++ b/service/mqtt_to_rdf/index.html Fri Sep 17 11:04:23 2021 -0700 @@ -1,12 +1,17 @@ - + mqtt_to_rdf - + - - - + + + + @@ -21,6 +26,5 @@ /mqtt /mqtt/events --> - diff -r c04b5303eb08 -r 9e7f571deedf service/mqtt_to_rdf/mqtt_to_rdf.py --- a/service/mqtt_to_rdf/mqtt_to_rdf.py Fri Sep 17 11:03:13 2021 -0700 +++ b/service/mqtt_to_rdf/mqtt_to_rdf.py Fri Sep 17 11:04:23 2021 -0700 @@ -298,17 +298,17 @@ cg = ConjunctiveGraph() for stmt in newGraph: cg.add(stmt + (self.uri,)) - meas = stmt[0].split('/')[-1] - if meas.startswith('airQuality'): - where_prefix, type_ = meas[len('airQuality'):].split('door') - where = where_prefix + 'door' - metric = 'air' - tags = {'loc': where.lower(), 'type': type_.lower()} - val = stmt[2].toPython() - if metric not in collectors: - collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys()) + # meas = stmt[0].split('/')[-1] + # if meas.startswith('airQuality'): + # where_prefix, type_ = meas[len('airQuality'):].split('door') + # where = where_prefix + 'door' + # metric = 'air' + # tags = {'loc': where.lower(), 'type': type_.lower()} + # val = stmt[2].toPython() + # if metric not in collectors: + # collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys()) - collectors[metric].labels(**tags).set(val) + # collectors[metric].labels(**tags).set(val) self.masterGraph.patchSubgraph(self.uri, cg) self.debugSub['currentOutputGraph']['n3'] = serializeWithNs(cg, hidePrefixes=True) @@ -380,28 +380,35 @@ 'subscribed': [], } + inference = Inference() + inference.setRules(config) + expandedConfig = inference.infer(config) + expandedConfig += inference.nonRuleStatements() + log.info('expanded config:') + for stmt in sorted(expandedConfig): + log.info(f' {stmt}') + mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort) influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST']) - inference = Inference() - inference.setRules(config) - expandedConfig = inference.infer(config) - log.info('expanded config:') - for stmt in sorted(expandedConfig): - log.info(f' {stmt}') + # this needs to be part of the config reload. Maybe GraphFile patch output would be better? srcs = [] for src in sorted(expandedConfig.subjects(RDF.type, ROOM['MqttStatementSource'])): - srcs.append( - MqttStatementSource(src, - config, - masterGraph, - mqtt=mqtt, - internalMqtt=internalMqtt, - debugPageData=debugPageData, - influxExport=influxExport, - inference=inference)) + log.info(f'setup source {src=}') + try: + srcs.append( + MqttStatementSource(src, + expandedConfig, + masterGraph, + mqtt=mqtt, + internalMqtt=internalMqtt, + debugPageData=debugPageData, + influxExport=influxExport, + inference=inference)) + except EmptyTopicError: + continue log.info(f'set up {len(srcs)} sources') peg = PatchableGraph() @@ -431,6 +438,7 @@ ], mqtt=mqtt, internalMqtt=internalMqtt, + expandedConfig=expandedConfig, masterGraph=masterGraph, debugPageData=debugPageData, debug=arg['-v']),