comparison service/mqtt_to_rdf/mqtt_to_rdf.py @ 1629:1c36ad1eb8b3

do inference on config. backend for new ui columns. rm some of the old filter pipeline
author drewp@bigasterisk.com
date Sat, 11 Sep 2021 23:31:32 -0700
parents b0608eb6e90c
children 78024b27f9ec
comparison
equal deleted inserted replaced
1628:24e8cd8fcdcd 1629:1c36ad1eb8b3
1 """ 1 """
2 Subscribe to mqtt topics; generate RDF statements. 2 Subscribe to mqtt topics; generate RDF statements.
3 """ 3 """
4 import glob
5 import json
6 import logging
7 import os
8
9 from mqtt_message import graphFromMessage
4 import os 10 import os
5 import time 11 import time
6 import json 12 from dataclasses import dataclass
7 import logging
8 from pathlib import Path 13 from pathlib import Path
9 from typing import Callable, Sequence, Set, Tuple, Union, cast 14 from typing import Callable, Sequence, Set, Tuple, Union, cast
10 from cyclone.util import ObjectDict 15
11 from rdflib.graph import ConjunctiveGraph 16 import cyclone.sse
12
13 from rx.core.typing import Mapper
14
15 from export_to_influxdb import InfluxExporter
16
17 import cyclone.web 17 import cyclone.web
18 import cyclone.sse 18 import export_to_influxdb
19 import prometheus_client 19 import prometheus_client
20 import rx 20 import rx
21 import rx.operators 21 import rx.operators
22 import rx.scheduler.eventloop 22 import rx.scheduler.eventloop
23 from docopt import docopt 23 from docopt import docopt
24 from export_to_influxdb import InfluxExporter
24 from mqtt_client import MqttClient 25 from mqtt_client import MqttClient
25 from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph) 26 from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph)
26 from prometheus_client import Counter, Gauge, Histogram, Summary 27 from prometheus_client import Counter, Gauge, Histogram, Summary
27 from prometheus_client.exposition import generate_latest 28 from prometheus_client.exposition import generate_latest
28 from prometheus_client.registry import REGISTRY 29 from prometheus_client.registry import REGISTRY
29 from rdfdb.rdflibpatch import graphFromQuads 30 from rdfdb.rdflibpatch import graphFromQuads
30 from rdflib import RDF, XSD, Graph, Literal, Namespace, URIRef 31 from rdflib import RDF, XSD, Graph, Literal, Namespace, URIRef
32 from rdflib.graph import ConjunctiveGraph
31 from rdflib.term import Node 33 from rdflib.term import Node
32 from rx.core import Observable 34 from rx.core import Observable
35 from rx.core.typing import Mapper
33 from standardservice.logsetup import log, verboseLogging 36 from standardservice.logsetup import log, verboseLogging
34 from twisted.internet import reactor, task 37 from twisted.internet import reactor, task
35 from dataclasses import dataclass 38 from inference import Inference
36 from button_events import button_events 39 from button_events import button_events
37 from patch_cyclone_sse import patchCycloneSse 40 from patch_cyclone_sse import patchCycloneSse
38 41
39 ROOM = Namespace('http://projects.bigasterisk.com/room/') 42 ROOM = Namespace('http://projects.bigasterisk.com/room/')
40 MESSAGES_SEEN = Counter('mqtt_messages_seen', '') 43 MESSAGES_SEEN = Counter('mqtt_messages_seen', '')
41 collectors = {} 44 collectors = {}
42 45
43 import export_to_influxdb
44
45 print(f'merge me back {export_to_influxdb}')
46
47 patchCycloneSse() 46 patchCycloneSse()
47
48
49 def logGraph(debug: Callable, label: str, graph: Graph):
50 n3 = cast(bytes, graph.serialize(format="n3"))
51 debug(label + ':\n' + n3.decode('utf8'))
48 52
49 53
50 def appendLimit(lst, elem, n=10): 54 def appendLimit(lst, elem, n=10):
51 del lst[:len(lst) - n + 1] 55 del lst[:len(lst) - n + 1]
52 lst.append(elem) 56 lst.append(elem)
208 212
209 def tightN3(node: Union[URIRef, Literal]) -> str: 213 def tightN3(node: Union[URIRef, Literal]) -> str:
210 return node.n3().replace('http://www.w3.org/2001/XMLSchema#', 'xsd:') 214 return node.n3().replace('http://www.w3.org/2001/XMLSchema#', 'xsd:')
211 215
212 216
213 def serializeWithNs(graph: ConjunctiveGraph) -> bytes: 217 def serializeWithNs(graph: Graph, hidePrefixes=False) -> str:
214 graph.bind('', ROOM) 218 graph.bind('', ROOM)
215 return cast(bytes, graph.serialize(format='n3')) 219 n3 = cast(bytes, graph.serialize(format='n3')).decode('utf8')
220 if hidePrefixes:
221 n3 = ''.join(line for line in n3.splitlines(keepends=True) if not line.strip().startswith('@prefix'))
222 return n3
216 223
217 224
218 class MqttStatementSource: 225 class MqttStatementSource:
219 226
220 def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData, 227 def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData,
221 influxExport: InfluxExporter): 228 influxExport: InfluxExporter, inference: Inference):
222 self.uri = uri 229 self.uri = uri
223 self.config = config 230 self.config = config
224 self.masterGraph = masterGraph 231 self.masterGraph = masterGraph
225 self.debugPageData = debugPageData 232 self.debugPageData = debugPageData
226 self.mqtt = mqtt # deprecated 233 self.mqtt = mqtt # deprecated
227 self.internalMqtt = internalMqtt 234 self.internalMqtt = internalMqtt
228 self.influxExport = influxExport 235 self.influxExport = influxExport
236 self.inference = inference
229 237
230 self.mqttTopic = self.topicFromConfig(self.config) 238 self.mqttTopic = self.topicFromConfig(self.config)
231 log.debug(f'new mqttTopic {self.mqttTopic}') 239 log.debug(f'new mqttTopic {self.mqttTopic}')
232 240
233 self.debugSub = { 241 self.debugSub = {
234 'topic': self.mqttTopic.decode('ascii'), 242 'topic': self.mqttTopic.decode('ascii'),
235 'recentMessages': [], 243 'recentMessageGraphs': [],
236 'recentParsed': [], 244 'recentMetrics': [],
237 'recentConversions': [],
238 'currentMetrics': [],
239 'currentOutputGraph': { 245 'currentOutputGraph': {
240 't': 1, 246 't': 1,
241 'n3': "(n3)" 247 'n3': "(n3)"
242 }, 248 },
243 } 249 }
244 self.debugPageData['subscribed'].append(self.debugSub) 250 self.debugPageData['subscribed'].append(self.debugSub)
245 251
246 rawBytes: Observable = self.subscribeMqtt(self.mqttTopic) 252 rawBytes: Observable = self.subscribeMqtt(self.mqttTopic)
247 rawBytes.subscribe(on_next=self.countIncomingMessage) 253 rawBytes.subscribe(on_next=self.countIncomingMessage)
248 254
249 filteredBytes = Filters(uri, config).makeOutputStream(rawBytes) 255 rawBytes.subscribe_(self.onMessage)
250 256
251 parsedObjs = Parser(uri, config).makeOutputStream(filteredBytes) 257 def onMessage(self, raw: bytes):
252 parsedObjs.subscribe_(lambda v: appendLimit(self.debugSub['recentParsed'], {'t': truncTime(), 'n3': tightN3(v)})) 258 g = graphFromMessage(self.mqttTopic, raw)
253 259 logGraph(log.debug, 'message graph', g)
254 convertedObjs = Converters(uri, config).makeOutputStream(parsedObjs) 260 appendLimit(
255 convertedObjs.subscribe_(lambda v: appendLimit(self.debugSub['recentConversions'], {'t': truncTime(), 'n3': tightN3(v)})) 261 self.debugSub['recentMessageGraphs'],
256 262 { #
257 outputQuadsSets = Rdfizer(uri, config).makeOutputStream(convertedObjs) 263 't': truncTime(),
258 outputQuadsSets.subscribe_(self.updateInflux) 264 'n3': serializeWithNs(g, hidePrefixes=True)
259 265 })
260 outputQuadsSets.subscribe_(self.updateMasterGraph) 266
267 implied = self.inference.infer(g)
268 self.updateMasterGraph(implied)
261 269
262 def topicFromConfig(self, config) -> bytes: 270 def topicFromConfig(self, config) -> bytes:
263 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) 271 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic'])))
264 return b'/'.join(t.encode('ascii') for t in topicParts) 272 return b'/'.join(t.encode('ascii') for t in topicParts)
265 273
270 278
271 def countIncomingMessage(self, msg: bytes): 279 def countIncomingMessage(self, msg: bytes):
272 self.debugPageData['messagesSeen'] += 1 280 self.debugPageData['messagesSeen'] += 1
273 MESSAGES_SEEN.inc() 281 MESSAGES_SEEN.inc()
274 282
275 appendLimit(self.debugSub['recentMessages'], {
276 't': truncTime(),
277 'msg': msg.decode('ascii'),
278 })
279
280 def updateInflux(self, newGraphs): 283 def updateInflux(self, newGraphs):
281 for g in newGraphs: 284 for g in newGraphs:
282 self.influxExport.exportToInflux(g) 285 self.influxExport.exportToInflux(g)
283 286
284 def updateMasterGraph(self, newGraphs): 287 def updateMasterGraph(self, newGraph):
285 newQuads = set.union(*newGraphs) 288 log.debug(f'{self.uri} update to {len(newGraph)} statements')
286 g = graphFromQuads(newQuads) 289
287 log.debug(f'{self.uri} update to {len(newQuads)} statements') 290 cg = ConjunctiveGraph()
288 291 for stmt in newGraph:
289 for quad in newQuads: 292 cg.add(stmt + (self.uri,))
290 meas = quad[0].split('/')[-1] 293 meas = stmt[0].split('/')[-1]
291 if meas.startswith('airQuality'): 294 if meas.startswith('airQuality'):
292 where_prefix, type_ = meas[len('airQuality'):].split('door') 295 where_prefix, type_ = meas[len('airQuality'):].split('door')
293 where = where_prefix + 'door' 296 where = where_prefix + 'door'
294 metric = 'air' 297 metric = 'air'
295 tags = {'loc': where.lower(), 'type': type_.lower()} 298 tags = {'loc': where.lower(), 'type': type_.lower()}
296 val = quad[2].toPython() 299 val = stmt[2].toPython()
297 if metric not in collectors: 300 if metric not in collectors:
298 collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys()) 301 collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys())
299 302
300 collectors[metric].labels(**tags).set(val) 303 collectors[metric].labels(**tags).set(val)
301 304
302 self.masterGraph.patchSubgraph(self.uri, g) 305 self.masterGraph.patchSubgraph(self.uri, cg)
303 self.debugSub['currentOutputGraph']['n3'] = serializeWithNs(g).decode('utf8') 306 self.debugSub['currentOutputGraph']['n3'] = serializeWithNs(cg, hidePrefixes=True)
304 307
305 308
306 class Metrics(cyclone.web.RequestHandler): 309 class Metrics(cyclone.web.RequestHandler):
307 310
308 def get(self): 311 def get(self):
344 --cs=STR Only process config filenames with this substring 347 --cs=STR Only process config filenames with this substring
345 """) 348 """)
346 verboseLogging(arg['-v']) 349 verboseLogging(arg['-v'])
347 logging.getLogger('mqtt').setLevel(logging.INFO) 350 logging.getLogger('mqtt').setLevel(logging.INFO)
348 logging.getLogger('mqtt_client').setLevel(logging.INFO) 351 logging.getLogger('mqtt_client').setLevel(logging.INFO)
352 logging.getLogger('infer').setLevel(logging.INFO)
353 log.info('log start')
349 354
350 config = Graph() 355 config = Graph()
351 for fn in Path('.').glob('conf/*.n3'): 356 for fn in Path('.').glob('conf/*.n3'):
352 if not arg['--cs'] or str(arg['--cs']) in str(fn): 357 if not arg['--cs'] or str(arg['--cs']) in str(fn):
353 log.debug(f'loading {fn}') 358 log.debug(f'loading {fn}')
370 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated 375 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated
371 internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort) 376 internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort)
372 377
373 influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST']) 378 influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST'])
374 379
380 inference = Inference()
381 inference.setRules(config)
382 expandedConfig = inference.infer(config)
383 log.info('expanded config:')
384 for stmt in sorted(expandedConfig):
385 log.info(f' {stmt}')
375 srcs = [] 386 srcs = []
376 for src in sorted(config.subjects(RDF.type, ROOM['MqttStatementSource'])): 387 for src in sorted(expandedConfig.subjects(RDF.type, ROOM['MqttStatementSource'])):
377 srcs.append( 388 srcs.append(
378 MqttStatementSource(src, 389 MqttStatementSource(src,
379 config, 390 config,
380 masterGraph, 391 masterGraph,
381 mqtt=mqtt, 392 mqtt=mqtt,
382 internalMqtt=internalMqtt, 393 internalMqtt=internalMqtt,
383 debugPageData=debugPageData, 394 debugPageData=debugPageData,
384 influxExport=influxExport)) 395 influxExport=influxExport,
396 inference=inference))
385 log.info(f'set up {len(srcs)} sources') 397 log.info(f'set up {len(srcs)} sources')
386 398
387 port = 10018 399 port = 10018
388 reactor.listenTCP(port, 400 reactor.listenTCP(port,
389 cyclone.web.Application([ 401 cyclone.web.Application([