Mercurial > code > home > repos > homeauto
comparison service/mqtt_to_rdf/mqtt_to_rdf.py @ 1629:1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
author | drewp@bigasterisk.com |
---|---|
date | Sat, 11 Sep 2021 23:31:32 -0700 |
parents | b0608eb6e90c |
children | 78024b27f9ec |
comparison
equal
deleted
inserted
replaced
1628:24e8cd8fcdcd | 1629:1c36ad1eb8b3 |
---|---|
1 """ | 1 """ |
2 Subscribe to mqtt topics; generate RDF statements. | 2 Subscribe to mqtt topics; generate RDF statements. |
3 """ | 3 """ |
4 import glob | |
5 import json | |
6 import logging | |
7 import os | |
8 | |
9 from mqtt_message import graphFromMessage | |
4 import os | 10 import os |
5 import time | 11 import time |
6 import json | 12 from dataclasses import dataclass |
7 import logging | |
8 from pathlib import Path | 13 from pathlib import Path |
9 from typing import Callable, Sequence, Set, Tuple, Union, cast | 14 from typing import Callable, Sequence, Set, Tuple, Union, cast |
10 from cyclone.util import ObjectDict | 15 |
11 from rdflib.graph import ConjunctiveGraph | 16 import cyclone.sse |
12 | |
13 from rx.core.typing import Mapper | |
14 | |
15 from export_to_influxdb import InfluxExporter | |
16 | |
17 import cyclone.web | 17 import cyclone.web |
18 import cyclone.sse | 18 import export_to_influxdb |
19 import prometheus_client | 19 import prometheus_client |
20 import rx | 20 import rx |
21 import rx.operators | 21 import rx.operators |
22 import rx.scheduler.eventloop | 22 import rx.scheduler.eventloop |
23 from docopt import docopt | 23 from docopt import docopt |
24 from export_to_influxdb import InfluxExporter | |
24 from mqtt_client import MqttClient | 25 from mqtt_client import MqttClient |
25 from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph) | 26 from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph) |
26 from prometheus_client import Counter, Gauge, Histogram, Summary | 27 from prometheus_client import Counter, Gauge, Histogram, Summary |
27 from prometheus_client.exposition import generate_latest | 28 from prometheus_client.exposition import generate_latest |
28 from prometheus_client.registry import REGISTRY | 29 from prometheus_client.registry import REGISTRY |
29 from rdfdb.rdflibpatch import graphFromQuads | 30 from rdfdb.rdflibpatch import graphFromQuads |
30 from rdflib import RDF, XSD, Graph, Literal, Namespace, URIRef | 31 from rdflib import RDF, XSD, Graph, Literal, Namespace, URIRef |
32 from rdflib.graph import ConjunctiveGraph | |
31 from rdflib.term import Node | 33 from rdflib.term import Node |
32 from rx.core import Observable | 34 from rx.core import Observable |
35 from rx.core.typing import Mapper | |
33 from standardservice.logsetup import log, verboseLogging | 36 from standardservice.logsetup import log, verboseLogging |
34 from twisted.internet import reactor, task | 37 from twisted.internet import reactor, task |
35 from dataclasses import dataclass | 38 from inference import Inference |
36 from button_events import button_events | 39 from button_events import button_events |
37 from patch_cyclone_sse import patchCycloneSse | 40 from patch_cyclone_sse import patchCycloneSse |
38 | 41 |
39 ROOM = Namespace('http://projects.bigasterisk.com/room/') | 42 ROOM = Namespace('http://projects.bigasterisk.com/room/') |
40 MESSAGES_SEEN = Counter('mqtt_messages_seen', '') | 43 MESSAGES_SEEN = Counter('mqtt_messages_seen', '') |
41 collectors = {} | 44 collectors = {} |
42 | 45 |
43 import export_to_influxdb | |
44 | |
45 print(f'merge me back {export_to_influxdb}') | |
46 | |
47 patchCycloneSse() | 46 patchCycloneSse() |
47 | |
48 | |
49 def logGraph(debug: Callable, label: str, graph: Graph): | |
50 n3 = cast(bytes, graph.serialize(format="n3")) | |
51 debug(label + ':\n' + n3.decode('utf8')) | |
48 | 52 |
49 | 53 |
50 def appendLimit(lst, elem, n=10): | 54 def appendLimit(lst, elem, n=10): |
51 del lst[:len(lst) - n + 1] | 55 del lst[:len(lst) - n + 1] |
52 lst.append(elem) | 56 lst.append(elem) |
208 | 212 |
209 def tightN3(node: Union[URIRef, Literal]) -> str: | 213 def tightN3(node: Union[URIRef, Literal]) -> str: |
210 return node.n3().replace('http://www.w3.org/2001/XMLSchema#', 'xsd:') | 214 return node.n3().replace('http://www.w3.org/2001/XMLSchema#', 'xsd:') |
211 | 215 |
212 | 216 |
213 def serializeWithNs(graph: ConjunctiveGraph) -> bytes: | 217 def serializeWithNs(graph: Graph, hidePrefixes=False) -> str: |
214 graph.bind('', ROOM) | 218 graph.bind('', ROOM) |
215 return cast(bytes, graph.serialize(format='n3')) | 219 n3 = cast(bytes, graph.serialize(format='n3')).decode('utf8') |
220 if hidePrefixes: | |
221 n3 = ''.join(line for line in n3.splitlines(keepends=True) if not line.strip().startswith('@prefix')) | |
222 return n3 | |
216 | 223 |
217 | 224 |
218 class MqttStatementSource: | 225 class MqttStatementSource: |
219 | 226 |
220 def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData, | 227 def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData, |
221 influxExport: InfluxExporter): | 228 influxExport: InfluxExporter, inference: Inference): |
222 self.uri = uri | 229 self.uri = uri |
223 self.config = config | 230 self.config = config |
224 self.masterGraph = masterGraph | 231 self.masterGraph = masterGraph |
225 self.debugPageData = debugPageData | 232 self.debugPageData = debugPageData |
226 self.mqtt = mqtt # deprecated | 233 self.mqtt = mqtt # deprecated |
227 self.internalMqtt = internalMqtt | 234 self.internalMqtt = internalMqtt |
228 self.influxExport = influxExport | 235 self.influxExport = influxExport |
236 self.inference = inference | |
229 | 237 |
230 self.mqttTopic = self.topicFromConfig(self.config) | 238 self.mqttTopic = self.topicFromConfig(self.config) |
231 log.debug(f'new mqttTopic {self.mqttTopic}') | 239 log.debug(f'new mqttTopic {self.mqttTopic}') |
232 | 240 |
233 self.debugSub = { | 241 self.debugSub = { |
234 'topic': self.mqttTopic.decode('ascii'), | 242 'topic': self.mqttTopic.decode('ascii'), |
235 'recentMessages': [], | 243 'recentMessageGraphs': [], |
236 'recentParsed': [], | 244 'recentMetrics': [], |
237 'recentConversions': [], | |
238 'currentMetrics': [], | |
239 'currentOutputGraph': { | 245 'currentOutputGraph': { |
240 't': 1, | 246 't': 1, |
241 'n3': "(n3)" | 247 'n3': "(n3)" |
242 }, | 248 }, |
243 } | 249 } |
244 self.debugPageData['subscribed'].append(self.debugSub) | 250 self.debugPageData['subscribed'].append(self.debugSub) |
245 | 251 |
246 rawBytes: Observable = self.subscribeMqtt(self.mqttTopic) | 252 rawBytes: Observable = self.subscribeMqtt(self.mqttTopic) |
247 rawBytes.subscribe(on_next=self.countIncomingMessage) | 253 rawBytes.subscribe(on_next=self.countIncomingMessage) |
248 | 254 |
249 filteredBytes = Filters(uri, config).makeOutputStream(rawBytes) | 255 rawBytes.subscribe_(self.onMessage) |
250 | 256 |
251 parsedObjs = Parser(uri, config).makeOutputStream(filteredBytes) | 257 def onMessage(self, raw: bytes): |
252 parsedObjs.subscribe_(lambda v: appendLimit(self.debugSub['recentParsed'], {'t': truncTime(), 'n3': tightN3(v)})) | 258 g = graphFromMessage(self.mqttTopic, raw) |
253 | 259 logGraph(log.debug, 'message graph', g) |
254 convertedObjs = Converters(uri, config).makeOutputStream(parsedObjs) | 260 appendLimit( |
255 convertedObjs.subscribe_(lambda v: appendLimit(self.debugSub['recentConversions'], {'t': truncTime(), 'n3': tightN3(v)})) | 261 self.debugSub['recentMessageGraphs'], |
256 | 262 { # |
257 outputQuadsSets = Rdfizer(uri, config).makeOutputStream(convertedObjs) | 263 't': truncTime(), |
258 outputQuadsSets.subscribe_(self.updateInflux) | 264 'n3': serializeWithNs(g, hidePrefixes=True) |
259 | 265 }) |
260 outputQuadsSets.subscribe_(self.updateMasterGraph) | 266 |
267 implied = self.inference.infer(g) | |
268 self.updateMasterGraph(implied) | |
261 | 269 |
262 def topicFromConfig(self, config) -> bytes: | 270 def topicFromConfig(self, config) -> bytes: |
263 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) | 271 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) |
264 return b'/'.join(t.encode('ascii') for t in topicParts) | 272 return b'/'.join(t.encode('ascii') for t in topicParts) |
265 | 273 |
270 | 278 |
271 def countIncomingMessage(self, msg: bytes): | 279 def countIncomingMessage(self, msg: bytes): |
272 self.debugPageData['messagesSeen'] += 1 | 280 self.debugPageData['messagesSeen'] += 1 |
273 MESSAGES_SEEN.inc() | 281 MESSAGES_SEEN.inc() |
274 | 282 |
275 appendLimit(self.debugSub['recentMessages'], { | |
276 't': truncTime(), | |
277 'msg': msg.decode('ascii'), | |
278 }) | |
279 | |
280 def updateInflux(self, newGraphs): | 283 def updateInflux(self, newGraphs): |
281 for g in newGraphs: | 284 for g in newGraphs: |
282 self.influxExport.exportToInflux(g) | 285 self.influxExport.exportToInflux(g) |
283 | 286 |
284 def updateMasterGraph(self, newGraphs): | 287 def updateMasterGraph(self, newGraph): |
285 newQuads = set.union(*newGraphs) | 288 log.debug(f'{self.uri} update to {len(newGraph)} statements') |
286 g = graphFromQuads(newQuads) | 289 |
287 log.debug(f'{self.uri} update to {len(newQuads)} statements') | 290 cg = ConjunctiveGraph() |
288 | 291 for stmt in newGraph: |
289 for quad in newQuads: | 292 cg.add(stmt + (self.uri,)) |
290 meas = quad[0].split('/')[-1] | 293 meas = stmt[0].split('/')[-1] |
291 if meas.startswith('airQuality'): | 294 if meas.startswith('airQuality'): |
292 where_prefix, type_ = meas[len('airQuality'):].split('door') | 295 where_prefix, type_ = meas[len('airQuality'):].split('door') |
293 where = where_prefix + 'door' | 296 where = where_prefix + 'door' |
294 metric = 'air' | 297 metric = 'air' |
295 tags = {'loc': where.lower(), 'type': type_.lower()} | 298 tags = {'loc': where.lower(), 'type': type_.lower()} |
296 val = quad[2].toPython() | 299 val = stmt[2].toPython() |
297 if metric not in collectors: | 300 if metric not in collectors: |
298 collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys()) | 301 collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys()) |
299 | 302 |
300 collectors[metric].labels(**tags).set(val) | 303 collectors[metric].labels(**tags).set(val) |
301 | 304 |
302 self.masterGraph.patchSubgraph(self.uri, g) | 305 self.masterGraph.patchSubgraph(self.uri, cg) |
303 self.debugSub['currentOutputGraph']['n3'] = serializeWithNs(g).decode('utf8') | 306 self.debugSub['currentOutputGraph']['n3'] = serializeWithNs(cg, hidePrefixes=True) |
304 | 307 |
305 | 308 |
306 class Metrics(cyclone.web.RequestHandler): | 309 class Metrics(cyclone.web.RequestHandler): |
307 | 310 |
308 def get(self): | 311 def get(self): |
344 --cs=STR Only process config filenames with this substring | 347 --cs=STR Only process config filenames with this substring |
345 """) | 348 """) |
346 verboseLogging(arg['-v']) | 349 verboseLogging(arg['-v']) |
347 logging.getLogger('mqtt').setLevel(logging.INFO) | 350 logging.getLogger('mqtt').setLevel(logging.INFO) |
348 logging.getLogger('mqtt_client').setLevel(logging.INFO) | 351 logging.getLogger('mqtt_client').setLevel(logging.INFO) |
352 logging.getLogger('infer').setLevel(logging.INFO) | |
353 log.info('log start') | |
349 | 354 |
350 config = Graph() | 355 config = Graph() |
351 for fn in Path('.').glob('conf/*.n3'): | 356 for fn in Path('.').glob('conf/*.n3'): |
352 if not arg['--cs'] or str(arg['--cs']) in str(fn): | 357 if not arg['--cs'] or str(arg['--cs']) in str(fn): |
353 log.debug(f'loading {fn}') | 358 log.debug(f'loading {fn}') |
370 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated | 375 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated |
371 internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort) | 376 internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort) |
372 | 377 |
373 influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST']) | 378 influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST']) |
374 | 379 |
380 inference = Inference() | |
381 inference.setRules(config) | |
382 expandedConfig = inference.infer(config) | |
383 log.info('expanded config:') | |
384 for stmt in sorted(expandedConfig): | |
385 log.info(f' {stmt}') | |
375 srcs = [] | 386 srcs = [] |
376 for src in sorted(config.subjects(RDF.type, ROOM['MqttStatementSource'])): | 387 for src in sorted(expandedConfig.subjects(RDF.type, ROOM['MqttStatementSource'])): |
377 srcs.append( | 388 srcs.append( |
378 MqttStatementSource(src, | 389 MqttStatementSource(src, |
379 config, | 390 config, |
380 masterGraph, | 391 masterGraph, |
381 mqtt=mqtt, | 392 mqtt=mqtt, |
382 internalMqtt=internalMqtt, | 393 internalMqtt=internalMqtt, |
383 debugPageData=debugPageData, | 394 debugPageData=debugPageData, |
384 influxExport=influxExport)) | 395 influxExport=influxExport, |
396 inference=inference)) | |
385 log.info(f'set up {len(srcs)} sources') | 397 log.info(f'set up {len(srcs)} sources') |
386 | 398 |
387 port = 10018 | 399 port = 10018 |
388 reactor.listenTCP(port, | 400 reactor.listenTCP(port, |
389 cyclone.web.Application([ | 401 cyclone.web.Application([ |