Mercurial > code > home > repos > homeauto
changeset 1706:2085ed9cfcc4
reworking UI to reflect the new inferencing code
author | drewp@bigasterisk.com |
---|---|
date | Sat, 23 Oct 2021 13:22:40 -0700 |
parents | 250f4c27d56f |
children | 4a6fffe6a994 |
files | service/mqtt_to_rdf/Dockerfile service/mqtt_to_rdf/mqtt_message.py service/mqtt_to_rdf/mqtt_to_rdf.py service/mqtt_to_rdf/src/index.ts |
diffstat | 4 files changed, 120 insertions(+), 114 deletions(-) [+] |
line wrap: on
line diff
--- a/service/mqtt_to_rdf/Dockerfile Sat Oct 23 13:21:06 2021 -0700 +++ b/service/mqtt_to_rdf/Dockerfile Sat Oct 23 13:22:40 2021 -0700 @@ -14,6 +14,6 @@ COPY src/ ./src RUN pnpm build -COPY *.py *.html ./ +COPY *.py index.html ./ CMD [ "python3", "./mqtt_to_rdf.py", "-v", "--cs=rules" ]
--- a/service/mqtt_to_rdf/mqtt_message.py Sat Oct 23 13:21:06 2021 -0700 +++ b/service/mqtt_to_rdf/mqtt_message.py Sat Oct 23 13:22:40 2021 -0700 @@ -8,10 +8,12 @@ JSON = Namespace('http://bigasterisk.com/anyJson/') -def graphFromMessage(topic: bytes, body: bytes): +def graphFromMessage(topicUri: URIRef, topic: bytes, body: bytes): graph = Graph() message = URIRef(f'{uuid.uuid1().urn}') + graph.add((topicUri, ROOM['message'], message)) + graph.add((message, RDF.type, ROOM['MqttMessage'])) topicSegments = BNode()
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py Sat Oct 23 13:21:06 2021 -0700 +++ b/service/mqtt_to_rdf/mqtt_to_rdf.py Sat Oct 23 13:22:40 2021 -0700 @@ -13,7 +13,7 @@ import time from dataclasses import dataclass from pathlib import Path -from typing import Callable, Sequence, Set, Tuple, Union, cast +from typing import Callable, Set, Tuple, Union, cast import cyclone.sse import cyclone.web @@ -29,11 +29,10 @@ from prometheus_client import Counter, Gauge, Histogram, Summary from prometheus_client.exposition import generate_latest from prometheus_client.registry import REGISTRY -from rdfdb.rdflibpatch import graphFromQuads from rdflib import RDF, XSD, Graph, Literal, Namespace, URIRef from rdflib.graph import ConjunctiveGraph from rdflib.term import Node -from rx.core import Observable +from rx.core.observable.observable import Observable from rx.core.typing import Mapper from standardservice.logsetup import log, verboseLogging from twisted.internet import reactor, task @@ -230,18 +229,19 @@ class MqttStatementSource: - def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData, - influxExport: InfluxExporter, inference: Inference): + def __init__(self, uri: URIRef, topic: bytes, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData, + # influxExport: InfluxExporter, + inference: Inference): self.uri = uri - self.config = config + self.masterGraph = masterGraph self.debugPageData = debugPageData self.mqtt = mqtt # deprecated self.internalMqtt = internalMqtt - self.influxExport = influxExport + # self.influxExport = influxExport self.inference = inference - self.mqttTopic = self.topicFromConfig(self.config) + self.mqttTopic = topic if self.mqttTopic == b'': raise EmptyTopicError(f"empty topic for {uri=}") log.debug(f'new mqttTopic {self.mqttTopic}') @@ -263,7 +263,7 @@ rawBytes.subscribe_(self.onMessage) def onMessage(self, raw: bytes): - g = graphFromMessage(self.mqttTopic, raw) + g = graphFromMessage(self.uri, self.mqttTopic, raw) logGraph(log.debug, 'message graph', g) appendLimit( self.debugSub['recentMessageGraphs'], @@ -275,10 +275,6 @@ implied = self.inference.infer(g) self.updateMasterGraph(implied) - def topicFromConfig(self, config) -> bytes: - topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) - return b'/'.join(t.encode('ascii') for t in topicParts) - def subscribeMqtt(self, topic: bytes): # goal is to get everyone on the internal broker and eliminate this mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt @@ -347,31 +343,70 @@ self.loop.stop() -@dataclass -class WatchFiles: - # could be merged with rdfdb.service and GraphFile code - globPattern: str - outGraph: Graph +# @dataclass +# class WatchFiles: +# # could be merged with rdfdb.service and GraphFile code +# globPattern: str +# outGraph: Graph + +# def __post_init__(self): +# self.lastUpdate = 0 +# task.LoopingCall(self.refresh).start(1) +# log.info(f'start watching {self.globPattern}') - def __post_init__(self): - self.lastUpdate = 0 - task.LoopingCall(self.refresh).start(1) - log.info(f'start watching {self.globPattern}') +# def refresh(self): +# files = glob.glob(self.globPattern) +# for fn in files: +# if os.path.getmtime(fn) > self.lastUpdate: +# break +# else: +# return +# self.lastUpdate = time.time() +# self.outGraph.remove((None, None, None)) +# log.info('reread config') +# for fn in files: +# # todo: handle read errors +# self.outGraph.parse(fn, format='n3') +# # and notify this change,so we can recalc the latest output + - def refresh(self): - files = glob.glob(self.globPattern) - for fn in files: - if os.path.getmtime(fn) > self.lastUpdate: - break - else: - return - self.lastUpdate = time.time() - self.outGraph.remove((None, None, None)) - log.info('reread config') - for fn in files: - # todo: handle read errors - self.outGraph.parse(fn, format='n3') - # and notify this change,so we can recalc the latest output +class RunState: + """this is rebuilt upon every config reload""" + def __init__(self, + expandedConfigPatchableCopy: PatchableGraph, # for output and display + masterGraph: PatchableGraph, # current sensor outputs + mqtt: MqttClient, + internalMqtt: MqttClient, + # influxExport: InfluxExporter, + inference: Inference): + loadedConfig = ConjunctiveGraph() + loadedConfig.parse('conf/rules.n3', format='n3') + + inference.setRules(loadedConfig) + self.expandedConfig = inference.infer(loadedConfig) + self.expandedConfig += inference.nonRuleStatements() + + ecWithQuads = ConjunctiveGraph() + for s, p, o in self.expandedConfig: + ecWithQuads.add((s, p, o, URIRef('/config'))) + expandedConfigPatchableCopy.setToGraph(ecWithQuads) + + self.srcs = [] + for src in sorted(self.expandedConfig.subjects(RDF.type, ROOM['MqttStatementSource'])): + log.info(f'setup source {src=}') + try: + self.srcs.append( + MqttStatementSource(src, self.topicFromConfig(self.expandedConfig, src), + masterGraph, mqtt=mqtt, internalMqtt=internalMqtt, debugPageData=debugPageData, + # influxExport=influxExport, + inference=inference)) + except EmptyTopicError: + continue + log.info(f'set up {len(self.srcs)} sources') + + def topicFromConfig(self, config, src) -> bytes: + topicParts = list(config.items(config.value(src, ROOM['mqttTopic']))) + return b'/'.join(t.encode('ascii') for t in topicParts) if __name__ == '__main__': @@ -389,87 +424,47 @@ # log.setLevel(logging.DEBUG) log.info('log start') - config = ConjunctiveGraph() - watcher = WatchFiles('conf/rules.n3', config) - # for fn in Path('.').glob('conf/*.n3'): - # if not arg['--cs'] or str(arg['--cs']) in str(fn): - # log.debug(f'loading {fn}') - # config.parse(str(fn), format='n3') - # else: - # log.debug(f'skipping {fn}') - + # config = ConjunctiveGraph() + # watcher = WatchFiles('conf/rules.n3', config) masterGraph = PatchableGraph() + inference = Inference() brokerHost = 'mosquitto-frontdoor.default.svc.cluster.local' brokerPort = 10210 + mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated + internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort) + + # needs rework since the config can change: + # influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST']) + debugPageData = { # schema in index.ts 'server': f'{brokerHost}:{brokerPort}', 'messagesSeen': 0, 'subscribed': [], + "rules": "", + "rulesInferred": "", } - inference = Inference() - inference.setRules(config) - expandedConfig = inference.infer(config) - expandedConfig += inference.nonRuleStatements() - log.info('expanded config:') - for stmt in sorted(expandedConfig): - log.info(f' {stmt}') - - mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated - internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort) - - influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST']) + expandedConfigPatchableCopy = PatchableGraph() - # this needs to be part of the config reload. Maybe GraphFile patch output would be better? - srcs = [] - for src in sorted(expandedConfig.subjects(RDF.type, ROOM['MqttStatementSource'])): - log.info(f'setup source {src=}') - try: - srcs.append( - MqttStatementSource(src, - expandedConfig, - masterGraph, - mqtt=mqtt, - internalMqtt=internalMqtt, - debugPageData=debugPageData, - influxExport=influxExport, - inference=inference)) - except EmptyTopicError: - continue - log.info(f'set up {len(srcs)} sources') - - peg = PatchableGraph() - peg.patch(Patch(addQuads=[(s,p,o,URIRef('/config')) for s,p,o in expandedConfig])) + runState = RunState(expandedConfigPatchableCopy, masterGraph, mqtt, internalMqtt, inference) port = 10018 reactor.listenTCP(port, cyclone.web.Application([ - (r"/()", cyclone.web.StaticFileHandler, { - "path": ".", - "default_filename": "index.html" - }), - (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, { - "path": "build" - }), - (r"/graph/config", CycloneGraphHandler, { - 'masterGraph': peg, - }), - (r"/graph/mqtt", CycloneGraphHandler, { - 'masterGraph': masterGraph - }), - (r"/graph/mqtt/events", CycloneGraphEventsHandler, { - 'masterGraph': masterGraph - }), + (r"/()", cyclone.web.StaticFileHandler, {"path": ".", "default_filename": "index.html"}), + (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, {"path": "build"}), + (r"/graph/config", CycloneGraphHandler, {'masterGraph': expandedConfigPatchableCopy}), + (r"/graph/mqtt", CycloneGraphHandler, {'masterGraph': masterGraph}), + (r"/graph/mqtt/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}), (r'/debugPageData', DebugPageData), (r'/metrics', Metrics), ], - mqtt=mqtt, - internalMqtt=internalMqtt, - expandedConfig=expandedConfig, - masterGraph=masterGraph, + # mqtt=mqtt, + # internalMqtt=internalMqtt, + # masterGraph=masterGraph, debugPageData=debugPageData, debug=arg['-v']), interface='::')
--- a/service/mqtt_to_rdf/src/index.ts Sat Oct 23 13:21:06 2021 -0700 +++ b/service/mqtt_to_rdf/src/index.ts Sat Oct 23 13:22:40 2021 -0700 @@ -40,6 +40,8 @@ server: string; messagesSeen: number; subscribed: Subscribed[]; + rules: string; + rulesInferred: string; } @customElement("mqtt-to-rdf-page") @@ -80,43 +82,50 @@ currentOutputGraph: { t: 1, n3: "(n3)" }, }, ], + rules: "", + rulesInferred: "", }; render() { const d = this.pageData; const now = Date.now() / 1000; const ago = (t: number) => html`${Math.round(now - t)}s ago`; - const topicItem = (t: Subscribed, index: number) => - html`<div class="topic" style="grid-column: 1; grid-row: ${index + 2}"> - <span class="topic">${t.topic} sticky this to graph column</span> - </div>`; - + const parsedGraphAtTime = (g: GraphAtTime) => html` <div class="graph">graph: ${g.n3}</div> `; const recentMessageGraphs = (t: Subscribed, index: number) => - html` <div style="grid-column: 2; grid-row: ${index + 2}">${t.recentMessageGraphs.map(parsedGraphAtTime)}</div> `; + html` <div style="grid-column: 1; grid-row: ${index + 2}"> + <div>Topic: <span class="topic">${t.topic}</span></div> + ${t.recentMessageGraphs.map(parsedGraphAtTime)} + </div> `; const metric = (m: Metric) => html`<div>metrix ${m.name} ${JSON.stringify(m.labels)} = ${m.value}</div>`; const outputMetrics = (t: Subscribed, index: number) => html` <div style="grid-column: 3; grid-row: ${index + 2}">${t.recentMetrics.map(metric)}</div> `; const outputGraph = (t: Subscribed, index: number) => - html` <div style="grid-column: 4; grid-row: ${index + 2}">${parsedGraphAtTime(t.currentOutputGraph)}</div> `; + html` <div style="grid-column: 2; grid-row: ${index + 2}">${parsedGraphAtTime(t.currentOutputGraph)}</div> `; return html` <h1>mqtt_to_rdf</h1> <section>connected to ${d.server}; messages received ${d.messagesSeen}</section> <div class="grid"> - <div class="hd" style="grid-row: 1; grid-column: 1">subscribed topics</div> - ${d.subscribed.map(topicItem)} - - <div class="hd" style="grid-row: 1; grid-column: 2">recent incoming messages</div> + <div class="hd" style="grid-row: 1; grid-column: 1">recent incoming messages, by topic</div> ${d.subscribed.map(recentMessageGraphs)} - <div class="hd" style="grid-row: 1; grid-column: 3">output metrics: prom collection according to converted graph</div> + <div class="hd" style="grid-row: 1; grid-column: 2">output subgraph</div> + ${d.subscribed.map(outputGraph)} + + <div class="hd" style="grid-row: 1; grid-column: 3">output metrics: <br>prom collection according to<br> converted graph</div> ${d.subscribed.map(outputMetrics)} + </div> - <div class="hd" style="grid-row: 1; grid-column: 4">output subgraph</div> - ${d.subscribed.map(outputGraph)} - </section> + <table> + <tr> + <th>Rules input</th> + <td><textarea>${this.pageData.rules}</textarea></td> + <th>Rules inferred</tH> + <td><pre>${this.pageData.rulesInferred}</pre></td> + </tr> + </table> `; } }