Mercurial > code > home > repos > homeauto
comparison service/mqtt_to_rdf/mqtt_to_rdf.py @ 1706:2085ed9cfcc4
reworking UI to reflect the new inferencing code
author | drewp@bigasterisk.com |
---|---|
date | Sat, 23 Oct 2021 13:22:40 -0700 |
parents | 34eb87f68ab8 |
children | 7d3797ed6681 |
comparison
equal
deleted
inserted
replaced
1705:250f4c27d56f | 1706:2085ed9cfcc4 |
---|---|
11 from mqtt_message import graphFromMessage | 11 from mqtt_message import graphFromMessage |
12 import os | 12 import os |
13 import time | 13 import time |
14 from dataclasses import dataclass | 14 from dataclasses import dataclass |
15 from pathlib import Path | 15 from pathlib import Path |
16 from typing import Callable, Sequence, Set, Tuple, Union, cast | 16 from typing import Callable, Set, Tuple, Union, cast |
17 | 17 |
18 import cyclone.sse | 18 import cyclone.sse |
19 import cyclone.web | 19 import cyclone.web |
20 import export_to_influxdb | 20 import export_to_influxdb |
21 import prometheus_client | 21 import prometheus_client |
27 from mqtt_client import MqttClient | 27 from mqtt_client import MqttClient |
28 from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph) | 28 from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph) |
29 from prometheus_client import Counter, Gauge, Histogram, Summary | 29 from prometheus_client import Counter, Gauge, Histogram, Summary |
30 from prometheus_client.exposition import generate_latest | 30 from prometheus_client.exposition import generate_latest |
31 from prometheus_client.registry import REGISTRY | 31 from prometheus_client.registry import REGISTRY |
32 from rdfdb.rdflibpatch import graphFromQuads | |
33 from rdflib import RDF, XSD, Graph, Literal, Namespace, URIRef | 32 from rdflib import RDF, XSD, Graph, Literal, Namespace, URIRef |
34 from rdflib.graph import ConjunctiveGraph | 33 from rdflib.graph import ConjunctiveGraph |
35 from rdflib.term import Node | 34 from rdflib.term import Node |
36 from rx.core import Observable | 35 from rx.core.observable.observable import Observable |
37 from rx.core.typing import Mapper | 36 from rx.core.typing import Mapper |
38 from standardservice.logsetup import log, verboseLogging | 37 from standardservice.logsetup import log, verboseLogging |
39 from twisted.internet import reactor, task | 38 from twisted.internet import reactor, task |
40 from inference import Inference | 39 from inference import Inference |
41 from button_events import button_events | 40 from button_events import button_events |
228 pass | 227 pass |
229 | 228 |
230 | 229 |
231 class MqttStatementSource: | 230 class MqttStatementSource: |
232 | 231 |
233 def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData, | 232 def __init__(self, uri: URIRef, topic: bytes, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData, |
234 influxExport: InfluxExporter, inference: Inference): | 233 # influxExport: InfluxExporter, |
234 inference: Inference): | |
235 self.uri = uri | 235 self.uri = uri |
236 self.config = config | 236 |
237 self.masterGraph = masterGraph | 237 self.masterGraph = masterGraph |
238 self.debugPageData = debugPageData | 238 self.debugPageData = debugPageData |
239 self.mqtt = mqtt # deprecated | 239 self.mqtt = mqtt # deprecated |
240 self.internalMqtt = internalMqtt | 240 self.internalMqtt = internalMqtt |
241 self.influxExport = influxExport | 241 # self.influxExport = influxExport |
242 self.inference = inference | 242 self.inference = inference |
243 | 243 |
244 self.mqttTopic = self.topicFromConfig(self.config) | 244 self.mqttTopic = topic |
245 if self.mqttTopic == b'': | 245 if self.mqttTopic == b'': |
246 raise EmptyTopicError(f"empty topic for {uri=}") | 246 raise EmptyTopicError(f"empty topic for {uri=}") |
247 log.debug(f'new mqttTopic {self.mqttTopic}') | 247 log.debug(f'new mqttTopic {self.mqttTopic}') |
248 | 248 |
249 self.debugSub = { | 249 self.debugSub = { |
261 rawBytes.subscribe(on_next=self.countIncomingMessage) | 261 rawBytes.subscribe(on_next=self.countIncomingMessage) |
262 | 262 |
263 rawBytes.subscribe_(self.onMessage) | 263 rawBytes.subscribe_(self.onMessage) |
264 | 264 |
265 def onMessage(self, raw: bytes): | 265 def onMessage(self, raw: bytes): |
266 g = graphFromMessage(self.mqttTopic, raw) | 266 g = graphFromMessage(self.uri, self.mqttTopic, raw) |
267 logGraph(log.debug, 'message graph', g) | 267 logGraph(log.debug, 'message graph', g) |
268 appendLimit( | 268 appendLimit( |
269 self.debugSub['recentMessageGraphs'], | 269 self.debugSub['recentMessageGraphs'], |
270 { # | 270 { # |
271 't': truncTime(), | 271 't': truncTime(), |
272 'n3': serializeWithNs(g, hidePrefixes=True) | 272 'n3': serializeWithNs(g, hidePrefixes=True) |
273 }) | 273 }) |
274 | 274 |
275 implied = self.inference.infer(g) | 275 implied = self.inference.infer(g) |
276 self.updateMasterGraph(implied) | 276 self.updateMasterGraph(implied) |
277 | |
278 def topicFromConfig(self, config) -> bytes: | |
279 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) | |
280 return b'/'.join(t.encode('ascii') for t in topicParts) | |
281 | 277 |
282 def subscribeMqtt(self, topic: bytes): | 278 def subscribeMqtt(self, topic: bytes): |
283 # goal is to get everyone on the internal broker and eliminate this | 279 # goal is to get everyone on the internal broker and eliminate this |
284 mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt | 280 mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt |
285 return mqtt.subscribe(topic) | 281 return mqtt.subscribe(topic) |
345 | 341 |
346 def unbind(self): | 342 def unbind(self): |
347 self.loop.stop() | 343 self.loop.stop() |
348 | 344 |
349 | 345 |
350 @dataclass | 346 # @dataclass |
351 class WatchFiles: | 347 # class WatchFiles: |
352 # could be merged with rdfdb.service and GraphFile code | 348 # # could be merged with rdfdb.service and GraphFile code |
353 globPattern: str | 349 # globPattern: str |
354 outGraph: Graph | 350 # outGraph: Graph |
355 | 351 |
356 def __post_init__(self): | 352 # def __post_init__(self): |
357 self.lastUpdate = 0 | 353 # self.lastUpdate = 0 |
358 task.LoopingCall(self.refresh).start(1) | 354 # task.LoopingCall(self.refresh).start(1) |
359 log.info(f'start watching {self.globPattern}') | 355 # log.info(f'start watching {self.globPattern}') |
360 | 356 |
361 def refresh(self): | 357 # def refresh(self): |
362 files = glob.glob(self.globPattern) | 358 # files = glob.glob(self.globPattern) |
363 for fn in files: | 359 # for fn in files: |
364 if os.path.getmtime(fn) > self.lastUpdate: | 360 # if os.path.getmtime(fn) > self.lastUpdate: |
365 break | 361 # break |
366 else: | 362 # else: |
367 return | 363 # return |
368 self.lastUpdate = time.time() | 364 # self.lastUpdate = time.time() |
369 self.outGraph.remove((None, None, None)) | 365 # self.outGraph.remove((None, None, None)) |
370 log.info('reread config') | 366 # log.info('reread config') |
371 for fn in files: | 367 # for fn in files: |
372 # todo: handle read errors | 368 # # todo: handle read errors |
373 self.outGraph.parse(fn, format='n3') | 369 # self.outGraph.parse(fn, format='n3') |
374 # and notify this change,so we can recalc the latest output | 370 # # and notify this change,so we can recalc the latest output |
371 | |
372 | |
373 class RunState: | |
374 """this is rebuilt upon every config reload""" | |
375 def __init__(self, | |
376 expandedConfigPatchableCopy: PatchableGraph, # for output and display | |
377 masterGraph: PatchableGraph, # current sensor outputs | |
378 mqtt: MqttClient, | |
379 internalMqtt: MqttClient, | |
380 # influxExport: InfluxExporter, | |
381 inference: Inference): | |
382 loadedConfig = ConjunctiveGraph() | |
383 loadedConfig.parse('conf/rules.n3', format='n3') | |
384 | |
385 inference.setRules(loadedConfig) | |
386 self.expandedConfig = inference.infer(loadedConfig) | |
387 self.expandedConfig += inference.nonRuleStatements() | |
388 | |
389 ecWithQuads = ConjunctiveGraph() | |
390 for s, p, o in self.expandedConfig: | |
391 ecWithQuads.add((s, p, o, URIRef('/config'))) | |
392 expandedConfigPatchableCopy.setToGraph(ecWithQuads) | |
393 | |
394 self.srcs = [] | |
395 for src in sorted(self.expandedConfig.subjects(RDF.type, ROOM['MqttStatementSource'])): | |
396 log.info(f'setup source {src=}') | |
397 try: | |
398 self.srcs.append( | |
399 MqttStatementSource(src, self.topicFromConfig(self.expandedConfig, src), | |
400 masterGraph, mqtt=mqtt, internalMqtt=internalMqtt, debugPageData=debugPageData, | |
401 # influxExport=influxExport, | |
402 inference=inference)) | |
403 except EmptyTopicError: | |
404 continue | |
405 log.info(f'set up {len(self.srcs)} sources') | |
406 | |
407 def topicFromConfig(self, config, src) -> bytes: | |
408 topicParts = list(config.items(config.value(src, ROOM['mqttTopic']))) | |
409 return b'/'.join(t.encode('ascii') for t in topicParts) | |
375 | 410 |
376 | 411 |
377 if __name__ == '__main__': | 412 if __name__ == '__main__': |
378 arg = docopt(""" | 413 arg = docopt(""" |
379 Usage: mqtt_to_rdf.py [options] | 414 Usage: mqtt_to_rdf.py [options] |
387 logging.getLogger('infer').setLevel(logging.INFO) | 422 logging.getLogger('infer').setLevel(logging.INFO) |
388 logging.getLogger('cbind').setLevel(logging.INFO) | 423 logging.getLogger('cbind').setLevel(logging.INFO) |
389 # log.setLevel(logging.DEBUG) | 424 # log.setLevel(logging.DEBUG) |
390 log.info('log start') | 425 log.info('log start') |
391 | 426 |
392 config = ConjunctiveGraph() | 427 # config = ConjunctiveGraph() |
393 watcher = WatchFiles('conf/rules.n3', config) | 428 # watcher = WatchFiles('conf/rules.n3', config) |
394 # for fn in Path('.').glob('conf/*.n3'): | |
395 # if not arg['--cs'] or str(arg['--cs']) in str(fn): | |
396 # log.debug(f'loading {fn}') | |
397 # config.parse(str(fn), format='n3') | |
398 # else: | |
399 # log.debug(f'skipping {fn}') | |
400 | |
401 masterGraph = PatchableGraph() | 429 masterGraph = PatchableGraph() |
430 inference = Inference() | |
402 | 431 |
403 brokerHost = 'mosquitto-frontdoor.default.svc.cluster.local' | 432 brokerHost = 'mosquitto-frontdoor.default.svc.cluster.local' |
404 brokerPort = 10210 | 433 brokerPort = 10210 |
434 | |
435 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated | |
436 internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort) | |
437 | |
438 # needs rework since the config can change: | |
439 # influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST']) | |
405 | 440 |
406 debugPageData = { | 441 debugPageData = { |
407 # schema in index.ts | 442 # schema in index.ts |
408 'server': f'{brokerHost}:{brokerPort}', | 443 'server': f'{brokerHost}:{brokerPort}', |
409 'messagesSeen': 0, | 444 'messagesSeen': 0, |
410 'subscribed': [], | 445 'subscribed': [], |
446 "rules": "", | |
447 "rulesInferred": "", | |
411 } | 448 } |
412 | 449 |
413 inference = Inference() | 450 expandedConfigPatchableCopy = PatchableGraph() |
414 inference.setRules(config) | 451 |
415 expandedConfig = inference.infer(config) | 452 runState = RunState(expandedConfigPatchableCopy, masterGraph, mqtt, internalMqtt, inference) |
416 expandedConfig += inference.nonRuleStatements() | |
417 log.info('expanded config:') | |
418 for stmt in sorted(expandedConfig): | |
419 log.info(f' {stmt}') | |
420 | |
421 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated | |
422 internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort) | |
423 | |
424 influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST']) | |
425 | |
426 # this needs to be part of the config reload. Maybe GraphFile patch output would be better? | |
427 srcs = [] | |
428 for src in sorted(expandedConfig.subjects(RDF.type, ROOM['MqttStatementSource'])): | |
429 log.info(f'setup source {src=}') | |
430 try: | |
431 srcs.append( | |
432 MqttStatementSource(src, | |
433 expandedConfig, | |
434 masterGraph, | |
435 mqtt=mqtt, | |
436 internalMqtt=internalMqtt, | |
437 debugPageData=debugPageData, | |
438 influxExport=influxExport, | |
439 inference=inference)) | |
440 except EmptyTopicError: | |
441 continue | |
442 log.info(f'set up {len(srcs)} sources') | |
443 | |
444 peg = PatchableGraph() | |
445 peg.patch(Patch(addQuads=[(s,p,o,URIRef('/config')) for s,p,o in expandedConfig])) | |
446 | 453 |
447 port = 10018 | 454 port = 10018 |
448 reactor.listenTCP(port, | 455 reactor.listenTCP(port, |
449 cyclone.web.Application([ | 456 cyclone.web.Application([ |
450 (r"/()", cyclone.web.StaticFileHandler, { | 457 (r"/()", cyclone.web.StaticFileHandler, {"path": ".", "default_filename": "index.html"}), |
451 "path": ".", | 458 (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, {"path": "build"}), |
452 "default_filename": "index.html" | 459 (r"/graph/config", CycloneGraphHandler, {'masterGraph': expandedConfigPatchableCopy}), |
453 }), | 460 (r"/graph/mqtt", CycloneGraphHandler, {'masterGraph': masterGraph}), |
454 (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, { | 461 (r"/graph/mqtt/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}), |
455 "path": "build" | |
456 }), | |
457 (r"/graph/config", CycloneGraphHandler, { | |
458 'masterGraph': peg, | |
459 }), | |
460 (r"/graph/mqtt", CycloneGraphHandler, { | |
461 'masterGraph': masterGraph | |
462 }), | |
463 (r"/graph/mqtt/events", CycloneGraphEventsHandler, { | |
464 'masterGraph': masterGraph | |
465 }), | |
466 (r'/debugPageData', DebugPageData), | 462 (r'/debugPageData', DebugPageData), |
467 (r'/metrics', Metrics), | 463 (r'/metrics', Metrics), |
468 ], | 464 ], |
469 mqtt=mqtt, | 465 # mqtt=mqtt, |
470 internalMqtt=internalMqtt, | 466 # internalMqtt=internalMqtt, |
471 expandedConfig=expandedConfig, | 467 # masterGraph=masterGraph, |
472 masterGraph=masterGraph, | |
473 debugPageData=debugPageData, | 468 debugPageData=debugPageData, |
474 debug=arg['-v']), | 469 debug=arg['-v']), |
475 interface='::') | 470 interface='::') |
476 log.info('serving on %s', port) | 471 log.info('serving on %s', port) |
477 | 472 |