comparison service/mqtt_to_rdf/mqtt_to_rdf.py @ 1706:2085ed9cfcc4

reworking UI to reflect the new inferencing code
author drewp@bigasterisk.com
date Sat, 23 Oct 2021 13:22:40 -0700
parents 34eb87f68ab8
children 7d3797ed6681
comparison
equal deleted inserted replaced
1705:250f4c27d56f 1706:2085ed9cfcc4
11 from mqtt_message import graphFromMessage 11 from mqtt_message import graphFromMessage
12 import os 12 import os
13 import time 13 import time
14 from dataclasses import dataclass 14 from dataclasses import dataclass
15 from pathlib import Path 15 from pathlib import Path
16 from typing import Callable, Sequence, Set, Tuple, Union, cast 16 from typing import Callable, Set, Tuple, Union, cast
17 17
18 import cyclone.sse 18 import cyclone.sse
19 import cyclone.web 19 import cyclone.web
20 import export_to_influxdb 20 import export_to_influxdb
21 import prometheus_client 21 import prometheus_client
27 from mqtt_client import MqttClient 27 from mqtt_client import MqttClient
28 from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph) 28 from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph)
29 from prometheus_client import Counter, Gauge, Histogram, Summary 29 from prometheus_client import Counter, Gauge, Histogram, Summary
30 from prometheus_client.exposition import generate_latest 30 from prometheus_client.exposition import generate_latest
31 from prometheus_client.registry import REGISTRY 31 from prometheus_client.registry import REGISTRY
32 from rdfdb.rdflibpatch import graphFromQuads
33 from rdflib import RDF, XSD, Graph, Literal, Namespace, URIRef 32 from rdflib import RDF, XSD, Graph, Literal, Namespace, URIRef
34 from rdflib.graph import ConjunctiveGraph 33 from rdflib.graph import ConjunctiveGraph
35 from rdflib.term import Node 34 from rdflib.term import Node
36 from rx.core import Observable 35 from rx.core.observable.observable import Observable
37 from rx.core.typing import Mapper 36 from rx.core.typing import Mapper
38 from standardservice.logsetup import log, verboseLogging 37 from standardservice.logsetup import log, verboseLogging
39 from twisted.internet import reactor, task 38 from twisted.internet import reactor, task
40 from inference import Inference 39 from inference import Inference
41 from button_events import button_events 40 from button_events import button_events
228 pass 227 pass
229 228
230 229
231 class MqttStatementSource: 230 class MqttStatementSource:
232 231
233 def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData, 232 def __init__(self, uri: URIRef, topic: bytes, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData,
234 influxExport: InfluxExporter, inference: Inference): 233 # influxExport: InfluxExporter,
234 inference: Inference):
235 self.uri = uri 235 self.uri = uri
236 self.config = config 236
237 self.masterGraph = masterGraph 237 self.masterGraph = masterGraph
238 self.debugPageData = debugPageData 238 self.debugPageData = debugPageData
239 self.mqtt = mqtt # deprecated 239 self.mqtt = mqtt # deprecated
240 self.internalMqtt = internalMqtt 240 self.internalMqtt = internalMqtt
241 self.influxExport = influxExport 241 # self.influxExport = influxExport
242 self.inference = inference 242 self.inference = inference
243 243
244 self.mqttTopic = self.topicFromConfig(self.config) 244 self.mqttTopic = topic
245 if self.mqttTopic == b'': 245 if self.mqttTopic == b'':
246 raise EmptyTopicError(f"empty topic for {uri=}") 246 raise EmptyTopicError(f"empty topic for {uri=}")
247 log.debug(f'new mqttTopic {self.mqttTopic}') 247 log.debug(f'new mqttTopic {self.mqttTopic}')
248 248
249 self.debugSub = { 249 self.debugSub = {
261 rawBytes.subscribe(on_next=self.countIncomingMessage) 261 rawBytes.subscribe(on_next=self.countIncomingMessage)
262 262
263 rawBytes.subscribe_(self.onMessage) 263 rawBytes.subscribe_(self.onMessage)
264 264
265 def onMessage(self, raw: bytes): 265 def onMessage(self, raw: bytes):
266 g = graphFromMessage(self.mqttTopic, raw) 266 g = graphFromMessage(self.uri, self.mqttTopic, raw)
267 logGraph(log.debug, 'message graph', g) 267 logGraph(log.debug, 'message graph', g)
268 appendLimit( 268 appendLimit(
269 self.debugSub['recentMessageGraphs'], 269 self.debugSub['recentMessageGraphs'],
270 { # 270 { #
271 't': truncTime(), 271 't': truncTime(),
272 'n3': serializeWithNs(g, hidePrefixes=True) 272 'n3': serializeWithNs(g, hidePrefixes=True)
273 }) 273 })
274 274
275 implied = self.inference.infer(g) 275 implied = self.inference.infer(g)
276 self.updateMasterGraph(implied) 276 self.updateMasterGraph(implied)
277
278 def topicFromConfig(self, config) -> bytes:
279 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic'])))
280 return b'/'.join(t.encode('ascii') for t in topicParts)
281 277
282 def subscribeMqtt(self, topic: bytes): 278 def subscribeMqtt(self, topic: bytes):
283 # goal is to get everyone on the internal broker and eliminate this 279 # goal is to get everyone on the internal broker and eliminate this
284 mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt 280 mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt
285 return mqtt.subscribe(topic) 281 return mqtt.subscribe(topic)
345 341
346 def unbind(self): 342 def unbind(self):
347 self.loop.stop() 343 self.loop.stop()
348 344
349 345
350 @dataclass 346 # @dataclass
351 class WatchFiles: 347 # class WatchFiles:
352 # could be merged with rdfdb.service and GraphFile code 348 # # could be merged with rdfdb.service and GraphFile code
353 globPattern: str 349 # globPattern: str
354 outGraph: Graph 350 # outGraph: Graph
355 351
356 def __post_init__(self): 352 # def __post_init__(self):
357 self.lastUpdate = 0 353 # self.lastUpdate = 0
358 task.LoopingCall(self.refresh).start(1) 354 # task.LoopingCall(self.refresh).start(1)
359 log.info(f'start watching {self.globPattern}') 355 # log.info(f'start watching {self.globPattern}')
360 356
361 def refresh(self): 357 # def refresh(self):
362 files = glob.glob(self.globPattern) 358 # files = glob.glob(self.globPattern)
363 for fn in files: 359 # for fn in files:
364 if os.path.getmtime(fn) > self.lastUpdate: 360 # if os.path.getmtime(fn) > self.lastUpdate:
365 break 361 # break
366 else: 362 # else:
367 return 363 # return
368 self.lastUpdate = time.time() 364 # self.lastUpdate = time.time()
369 self.outGraph.remove((None, None, None)) 365 # self.outGraph.remove((None, None, None))
370 log.info('reread config') 366 # log.info('reread config')
371 for fn in files: 367 # for fn in files:
372 # todo: handle read errors 368 # # todo: handle read errors
373 self.outGraph.parse(fn, format='n3') 369 # self.outGraph.parse(fn, format='n3')
374 # and notify this change,so we can recalc the latest output 370 # # and notify this change,so we can recalc the latest output
371
372
373 class RunState:
374 """this is rebuilt upon every config reload"""
375 def __init__(self,
376 expandedConfigPatchableCopy: PatchableGraph, # for output and display
377 masterGraph: PatchableGraph, # current sensor outputs
378 mqtt: MqttClient,
379 internalMqtt: MqttClient,
380 # influxExport: InfluxExporter,
381 inference: Inference):
382 loadedConfig = ConjunctiveGraph()
383 loadedConfig.parse('conf/rules.n3', format='n3')
384
385 inference.setRules(loadedConfig)
386 self.expandedConfig = inference.infer(loadedConfig)
387 self.expandedConfig += inference.nonRuleStatements()
388
389 ecWithQuads = ConjunctiveGraph()
390 for s, p, o in self.expandedConfig:
391 ecWithQuads.add((s, p, o, URIRef('/config')))
392 expandedConfigPatchableCopy.setToGraph(ecWithQuads)
393
394 self.srcs = []
395 for src in sorted(self.expandedConfig.subjects(RDF.type, ROOM['MqttStatementSource'])):
396 log.info(f'setup source {src=}')
397 try:
398 self.srcs.append(
399 MqttStatementSource(src, self.topicFromConfig(self.expandedConfig, src),
400 masterGraph, mqtt=mqtt, internalMqtt=internalMqtt, debugPageData=debugPageData,
401 # influxExport=influxExport,
402 inference=inference))
403 except EmptyTopicError:
404 continue
405 log.info(f'set up {len(self.srcs)} sources')
406
407 def topicFromConfig(self, config, src) -> bytes:
408 topicParts = list(config.items(config.value(src, ROOM['mqttTopic'])))
409 return b'/'.join(t.encode('ascii') for t in topicParts)
375 410
376 411
377 if __name__ == '__main__': 412 if __name__ == '__main__':
378 arg = docopt(""" 413 arg = docopt("""
379 Usage: mqtt_to_rdf.py [options] 414 Usage: mqtt_to_rdf.py [options]
387 logging.getLogger('infer').setLevel(logging.INFO) 422 logging.getLogger('infer').setLevel(logging.INFO)
388 logging.getLogger('cbind').setLevel(logging.INFO) 423 logging.getLogger('cbind').setLevel(logging.INFO)
389 # log.setLevel(logging.DEBUG) 424 # log.setLevel(logging.DEBUG)
390 log.info('log start') 425 log.info('log start')
391 426
392 config = ConjunctiveGraph() 427 # config = ConjunctiveGraph()
393 watcher = WatchFiles('conf/rules.n3', config) 428 # watcher = WatchFiles('conf/rules.n3', config)
394 # for fn in Path('.').glob('conf/*.n3'):
395 # if not arg['--cs'] or str(arg['--cs']) in str(fn):
396 # log.debug(f'loading {fn}')
397 # config.parse(str(fn), format='n3')
398 # else:
399 # log.debug(f'skipping {fn}')
400
401 masterGraph = PatchableGraph() 429 masterGraph = PatchableGraph()
430 inference = Inference()
402 431
403 brokerHost = 'mosquitto-frontdoor.default.svc.cluster.local' 432 brokerHost = 'mosquitto-frontdoor.default.svc.cluster.local'
404 brokerPort = 10210 433 brokerPort = 10210
434
435 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated
436 internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort)
437
438 # needs rework since the config can change:
439 # influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST'])
405 440
406 debugPageData = { 441 debugPageData = {
407 # schema in index.ts 442 # schema in index.ts
408 'server': f'{brokerHost}:{brokerPort}', 443 'server': f'{brokerHost}:{brokerPort}',
409 'messagesSeen': 0, 444 'messagesSeen': 0,
410 'subscribed': [], 445 'subscribed': [],
446 "rules": "",
447 "rulesInferred": "",
411 } 448 }
412 449
413 inference = Inference() 450 expandedConfigPatchableCopy = PatchableGraph()
414 inference.setRules(config) 451
415 expandedConfig = inference.infer(config) 452 runState = RunState(expandedConfigPatchableCopy, masterGraph, mqtt, internalMqtt, inference)
416 expandedConfig += inference.nonRuleStatements()
417 log.info('expanded config:')
418 for stmt in sorted(expandedConfig):
419 log.info(f' {stmt}')
420
421 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated
422 internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort)
423
424 influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST'])
425
426 # this needs to be part of the config reload. Maybe GraphFile patch output would be better?
427 srcs = []
428 for src in sorted(expandedConfig.subjects(RDF.type, ROOM['MqttStatementSource'])):
429 log.info(f'setup source {src=}')
430 try:
431 srcs.append(
432 MqttStatementSource(src,
433 expandedConfig,
434 masterGraph,
435 mqtt=mqtt,
436 internalMqtt=internalMqtt,
437 debugPageData=debugPageData,
438 influxExport=influxExport,
439 inference=inference))
440 except EmptyTopicError:
441 continue
442 log.info(f'set up {len(srcs)} sources')
443
444 peg = PatchableGraph()
445 peg.patch(Patch(addQuads=[(s,p,o,URIRef('/config')) for s,p,o in expandedConfig]))
446 453
447 port = 10018 454 port = 10018
448 reactor.listenTCP(port, 455 reactor.listenTCP(port,
449 cyclone.web.Application([ 456 cyclone.web.Application([
450 (r"/()", cyclone.web.StaticFileHandler, { 457 (r"/()", cyclone.web.StaticFileHandler, {"path": ".", "default_filename": "index.html"}),
451 "path": ".", 458 (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, {"path": "build"}),
452 "default_filename": "index.html" 459 (r"/graph/config", CycloneGraphHandler, {'masterGraph': expandedConfigPatchableCopy}),
453 }), 460 (r"/graph/mqtt", CycloneGraphHandler, {'masterGraph': masterGraph}),
454 (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, { 461 (r"/graph/mqtt/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}),
455 "path": "build"
456 }),
457 (r"/graph/config", CycloneGraphHandler, {
458 'masterGraph': peg,
459 }),
460 (r"/graph/mqtt", CycloneGraphHandler, {
461 'masterGraph': masterGraph
462 }),
463 (r"/graph/mqtt/events", CycloneGraphEventsHandler, {
464 'masterGraph': masterGraph
465 }),
466 (r'/debugPageData', DebugPageData), 462 (r'/debugPageData', DebugPageData),
467 (r'/metrics', Metrics), 463 (r'/metrics', Metrics),
468 ], 464 ],
469 mqtt=mqtt, 465 # mqtt=mqtt,
470 internalMqtt=internalMqtt, 466 # internalMqtt=internalMqtt,
471 expandedConfig=expandedConfig, 467 # masterGraph=masterGraph,
472 masterGraph=masterGraph,
473 debugPageData=debugPageData, 468 debugPageData=debugPageData,
474 debug=arg['-v']), 469 debug=arg['-v']),
475 interface='::') 470 interface='::')
476 log.info('serving on %s', port) 471 log.info('serving on %s', port)
477 472