Mercurial > code > home > repos > homeauto
annotate service/mqtt_to_rdf/mqtt_to_rdf.py @ 1726:7d3797ed6681
rough port to starlette and reactivex
author | drewp@bigasterisk.com |
---|---|
date | Tue, 20 Jun 2023 23:14:28 -0700 |
parents | 2085ed9cfcc4 |
children | 23e6154e6c11 |
rev | line source |
---|---|
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
1 """ |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
2 Subscribe to mqtt topics; generate RDF statements. |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
3 """ |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
4 import asyncio |
1629
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
5 import json |
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
6 import logging |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
7 import time |
1629
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
8 from dataclasses import dataclass |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
9 from typing import Callable, List, Set, Tuple, Union, cast |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
10 from mqttrx import MqttClient |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
11 from reactivex import Observable, empty, operators |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
12 import reactivex |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
13 from reactivex.scheduler.eventloop.asyncioscheduler import AsyncIOScheduler |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
14 |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
15 from patchablegraph import PatchableGraph |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
16 from patchablegraph.handler import GraphEvents, StaticGraph |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
17 from prometheus_client import Counter |
797 | 18 from rdflib import RDF, XSD, Graph, Literal, Namespace, URIRef |
1629
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
19 from rdflib.graph import ConjunctiveGraph |
780 | 20 from rdflib.term import Node |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
21 from starlette.applications import Starlette |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
22 from starlette.routing import Route |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
23 from starlette.staticfiles import StaticFiles |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
24 from reactivex.typing import Mapper |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
25 from starlette_exporter import handle_metrics |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
26 from starlette_exporter.middleware import PrometheusMiddleware |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
27 |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
28 from button_events import button_events |
1629
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
29 from inference import Inference |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
30 from mqtt_message import graphFromMessage |
793
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
31 |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
32 log = logging.getLogger() |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
33 ROOM = Namespace('http://projects.bigasterisk.com/room/') |
1583 | 34 MESSAGES_SEEN = Counter('mqtt_messages_seen', '') |
797 | 35 collectors = {} |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
36 |
1629
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
37 def logGraph(debug: Callable, label: str, graph: Graph): |
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
38 n3 = cast(bytes, graph.serialize(format="n3")) |
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
39 debug(label + ':\n' + n3.decode('utf8')) |
1583 | 40 |
780 | 41 |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
42 def appendLimit(lst, elem, n=10): |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
43 del lst[:len(lst) - n + 1] |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
44 lst.append(elem) |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
45 |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
46 |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
47 def parseDurationLiteral(lit: Literal) -> float: |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
48 if lit.endswith('s'): |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
49 return float(lit.split('s')[0]) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
50 raise NotImplementedError(f'duration literal: {lit}') |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
51 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
52 |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
53 @dataclass |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
54 class StreamPipelineStep: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
55 uri: URIRef # a :MqttStatementSource |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
56 config: Graph |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
57 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
58 def makeOutputStream(self, inStream: Observable) -> Observable: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
59 return inStream |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
60 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
61 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
62 class Filters(StreamPipelineStep): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
63 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
64 def makeOutputStream(self, inStream: Observable) -> Observable: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
65 jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals']) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
66 if jsonEq: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
67 required = json.loads(jsonEq.toPython()) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
68 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
69 def eq(jsonBytes): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
70 msg = json.loads(jsonBytes.decode('utf8')) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
71 return msg == required |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
72 |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
73 outStream = operators.filter(eq)(inStream) |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
74 else: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
75 outStream = inStream |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
76 return outStream |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
77 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
78 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
79 class Parser(StreamPipelineStep): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
80 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
81 def makeOutputStream(self, inStream: Observable) -> Observable: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
82 parser = self.getParser() |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
83 return parser(inStream) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
84 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
85 def getParser(self) -> Callable[[Observable], Observable]: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
86 parserType = cast(URIRef, self.config.value(self.uri, ROOM['parser'])) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
87 func = self.getParserFunc(parserType) |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
88 return operators.map(cast(Mapper, func)) |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
89 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
90 def getParserFunc(self, parserType: URIRef) -> Callable[[bytes], Node]: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
91 if parserType == XSD.double: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
92 return lambda v: Literal(float(v)) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
93 elif parserType == ROOM['tagIdToUri']: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
94 return self.tagIdToUri |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
95 elif parserType == ROOM['onOffBrightness']: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
96 return lambda v: Literal(0.0 if v == b'OFF' else 1.0) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
97 elif parserType == ROOM['jsonBrightness']: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
98 return self.parseJsonBrightness |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
99 elif ROOM['ValueMap'] in self.config.objects(parserType, RDF.type): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
100 return lambda v: self.remap(parserType, v.decode('utf8')) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
101 elif parserType == ROOM['rfCode']: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
102 return self.parseJsonRfCode |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
103 elif parserType == ROOM['tradfri']: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
104 return self.parseTradfriMessage |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
105 else: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
106 raise NotImplementedError(parserType) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
107 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
108 def tagIdToUri(self, value: bytes) -> URIRef: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
109 justHex = value.decode('utf8').replace('-', '').lower() |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
110 int(justHex, 16) # validate |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
111 return URIRef(f'http://bigasterisk.com/rfidCard/{justHex}') |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
112 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
113 def parseJsonBrightness(self, mqttValue: bytes): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
114 msg = json.loads(mqttValue.decode('utf8')) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
115 return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
116 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
117 def remap(self, parser, valueStr: str) -> Node: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
118 g = self.config |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
119 value = Literal(valueStr) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
120 for entry in g.objects(parser, ROOM['map']): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
121 if value == g.value(entry, ROOM['from']): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
122 to_ = g.value(entry, ROOM['to']) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
123 if not isinstance(to_, Node): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
124 raise TypeError(f'{to_=}') |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
125 return to_ |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
126 raise KeyError(value) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
127 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
128 def parseJsonRfCode(self, mqttValue: bytes): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
129 msg = json.loads(mqttValue.decode('utf8')) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
130 return Literal('%08x%08x' % (msg['code0'], msg['code1'])) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
131 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
132 def parseTradfriMessage(self, mqttValue: bytes) -> Node: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
133 log.info(f'trad {mqttValue}') |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
134 return Literal('todo') |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
135 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
136 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
137 class Converters(StreamPipelineStep): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
138 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
139 def makeOutputStream(self, inStream: Observable) -> Observable: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
140 out = inStream |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
141 g = self.config |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
142 for conv in g.items(g.value(self.uri, ROOM['conversions'])): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
143 out = self.conversionStep(conv)(out) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
144 return out |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
145 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
146 def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
147 g = self.config |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
148 if conv == ROOM['celsiusToFarenheit']: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
149 |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
150 return operators.map(cast(Mapper, self.c2f)) |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
151 elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
152 threshold = cast(Literal, g.value(conv, ROOM['ignoreValueBelow'])).toPython() |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
153 return operators.filter(lambda value: cast(Literal, value).toPython() >= threshold) |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
154 elif conv == ROOM['buttonPress']: |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
155 return button_events(min_hold_sec=1.0, release_after_sec=1.0) |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
156 else: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
157 raise NotImplementedError(conv) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
158 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
159 def c2f(self, value: Literal) -> Node: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
160 return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2)) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
161 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
162 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
163 class Rdfizer(StreamPipelineStep): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
164 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
165 def makeOutputStream(self, inStream: Observable) -> Observable: |
1583 | 166 plans = list(self.config.objects(self.uri, ROOM['graphStatements'])) |
167 log.debug(f'{self.uri=} has {len(plans)=}') | |
168 if not plans: | |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
169 return empty() |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
170 outputQuadsSets = reactivex.combine_latest(*[self.makeQuads(inStream, plan) for plan in plans]) |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
171 return outputQuadsSets |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
172 |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
173 def makeQuads(self, inStream: Observable, plan: Node) -> Observable: |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
174 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
175 def quadsFromValue(valueNode): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
176 return set([(self.uri, self.config.value(plan, ROOM['outputPredicate']), valueNode, self.uri)]) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
177 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
178 def emptyQuads(element) -> Set[Tuple]: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
179 return set([]) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
180 |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
181 quads = operators.map(cast(Mapper, quadsFromValue))(inStream) |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
182 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
183 dur = self.config.value(plan, ROOM['statementLifetime']) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
184 if dur is not None: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
185 sec = parseDurationLiteral(dur) |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
186 loop = AsyncIOScheduler(asyncio.get_event_loop()) |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
187 quads = quads.pipe( |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
188 operators.debounce(sec, loop), |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
189 operators.map(cast(Mapper, emptyQuads)), |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
190 operators.merge(quads), |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
191 ) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
192 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
193 return quads |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
194 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
195 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
196 def truncTime(): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
197 return round(time.time(), 3) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
198 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
199 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
200 def tightN3(node: Union[URIRef, Literal]) -> str: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
201 return node.n3().replace('http://www.w3.org/2001/XMLSchema#', 'xsd:') |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
202 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
203 |
1629
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
204 def serializeWithNs(graph: Graph, hidePrefixes=False) -> str: |
1583 | 205 graph.bind('', ROOM) |
1629
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
206 n3 = cast(bytes, graph.serialize(format='n3')).decode('utf8') |
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
207 if hidePrefixes: |
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
208 n3 = ''.join(line for line in n3.splitlines(keepends=True) if not line.strip().startswith('@prefix')) |
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
209 return n3 |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
210 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
211 |
1643
c04b5303eb08
try not to make empty topic subscriptions
drewp@bigasterisk.com
parents:
1642
diff
changeset
|
212 class EmptyTopicError(ValueError): |
c04b5303eb08
try not to make empty topic subscriptions
drewp@bigasterisk.com
parents:
1642
diff
changeset
|
213 pass |
c04b5303eb08
try not to make empty topic subscriptions
drewp@bigasterisk.com
parents:
1642
diff
changeset
|
214 |
c04b5303eb08
try not to make empty topic subscriptions
drewp@bigasterisk.com
parents:
1642
diff
changeset
|
215 |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
216 class MqttStatementSource: |
780 | 217 |
1706
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
218 def __init__(self, uri: URIRef, topic: bytes, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData, |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
219 # influxExport: InfluxExporter, |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
220 inference: Inference): |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
221 self.uri = uri |
1706
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
222 |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
223 self.masterGraph = masterGraph |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
224 self.debugPageData = debugPageData |
780 | 225 self.mqtt = mqtt # deprecated |
767 | 226 self.internalMqtt = internalMqtt |
1706
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
227 # self.influxExport = influxExport |
1629
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
228 self.inference = inference |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
229 |
1706
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
230 self.mqttTopic = topic |
1643
c04b5303eb08
try not to make empty topic subscriptions
drewp@bigasterisk.com
parents:
1642
diff
changeset
|
231 if self.mqttTopic == b'': |
c04b5303eb08
try not to make empty topic subscriptions
drewp@bigasterisk.com
parents:
1642
diff
changeset
|
232 raise EmptyTopicError(f"empty topic for {uri=}") |
767 | 233 log.debug(f'new mqttTopic {self.mqttTopic}') |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
234 |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
235 self.debugSub = { |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
236 'topic': self.mqttTopic.decode('ascii'), |
1629
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
237 'recentMessageGraphs': [], |
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
238 'recentMetrics': [], |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
239 'currentOutputGraph': { |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
240 't': 1, |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
241 'n3': "(n3)" |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
242 }, |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
243 } |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
244 self.debugPageData['subscribed'].append(self.debugSub) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
245 |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
246 rawBytes: Observable = self.subscribeMqtt(self.mqttTopic) |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
247 rawBytes.subscribe(on_next=self.countIncomingMessage) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
248 |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
249 rawBytes.subscribe(self.onMessage) |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
250 |
1629
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
251 def onMessage(self, raw: bytes): |
1706
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
252 g = graphFromMessage(self.uri, self.mqttTopic, raw) |
1629
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
253 logGraph(log.debug, 'message graph', g) |
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
254 appendLimit( |
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
255 self.debugSub['recentMessageGraphs'], |
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
256 { # |
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
257 't': truncTime(), |
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
258 'n3': serializeWithNs(g, hidePrefixes=True) |
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
259 }) |
793
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
260 |
1629
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
261 implied = self.inference.infer(g) |
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
262 self.updateMasterGraph(implied) |
793
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
263 |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
264 def subscribeMqtt(self, topic: bytes): |
767 | 265 # goal is to get everyone on the internal broker and eliminate this |
266 mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt | |
267 return mqtt.subscribe(topic) | |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
268 |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
269 def countIncomingMessage(self, msg: bytes): |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
270 self.debugPageData['messagesSeen'] += 1 |
1583 | 271 MESSAGES_SEEN.inc() |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
272 |
1629
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
273 def updateMasterGraph(self, newGraph): |
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
274 log.debug(f'{self.uri} update to {len(newGraph)} statements') |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
275 |
1629
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
276 cg = ConjunctiveGraph() |
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
277 for stmt in newGraph: |
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
278 cg.add(stmt + (self.uri,)) |
1644 | 279 # meas = stmt[0].split('/')[-1] |
280 # if meas.startswith('airQuality'): | |
281 # where_prefix, type_ = meas[len('airQuality'):].split('door') | |
282 # where = where_prefix + 'door' | |
283 # metric = 'air' | |
284 # tags = {'loc': where.lower(), 'type': type_.lower()} | |
285 # val = stmt[2].toPython() | |
286 # if metric not in collectors: | |
287 # collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys()) | |
796
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
288 |
1644 | 289 # collectors[metric].labels(**tags).set(val) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
290 |
1629
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
291 self.masterGraph.patchSubgraph(self.uri, cg) |
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
292 self.debugSub['currentOutputGraph']['n3'] = serializeWithNs(cg, hidePrefixes=True) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
293 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
294 |
796
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
295 |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
296 # class DebugPageData(cyclone.sse.SSEHandler): |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
297 |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
298 # def __init__(self, application, request): |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
299 # cyclone.sse.SSEHandler.__init__(self, application, request) |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
300 # self.lastSent = None |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
301 |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
302 # def watch(self): |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
303 # try: |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
304 # dpd = self.settings.debugPageData |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
305 # js = json.dumps(dpd, sort_keys=True) |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
306 # if js != self.lastSent: |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
307 # log.debug('sending dpd update') |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
308 # self.sendEvent(message=js.encode('utf8')) |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
309 # self.lastSent = js |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
310 # except Exception: |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
311 # import traceback |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
312 # traceback.print_exc() |
1647
34eb87f68ab8
WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents:
1644
diff
changeset
|
313 |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
314 # def bind(self): |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
315 # self.loop = task.LoopingCall(self.watch) |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
316 # self.loop.start(1, now=True) |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
317 |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
318 # def unbind(self): |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
319 # self.loop.stop() |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
320 |
1706
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
321 |
1647
34eb87f68ab8
WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents:
1644
diff
changeset
|
322 |
1706
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
323 class RunState: |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
324 """this is rebuilt upon every config reload""" |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
325 def __init__(self, |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
326 expandedConfigPatchableCopy: PatchableGraph, # for output and display |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
327 masterGraph: PatchableGraph, # current sensor outputs |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
328 mqtt: MqttClient, |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
329 internalMqtt: MqttClient, |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
330 # influxExport: InfluxExporter, |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
331 inference: Inference): |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
332 loadedConfig = ConjunctiveGraph() |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
333 loadedConfig.parse('conf/rules.n3', format='n3') |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
334 |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
335 inference.setRules(loadedConfig) |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
336 self.expandedConfig = inference.infer(loadedConfig) |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
337 self.expandedConfig += inference.nonRuleStatements() |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
338 |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
339 ecWithQuads = ConjunctiveGraph() |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
340 for s, p, o in self.expandedConfig: |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
341 ecWithQuads.add((s, p, o, URIRef('/config'))) |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
342 expandedConfigPatchableCopy.setToGraph(ecWithQuads) |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
343 |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
344 self.srcs = [] |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
345 srcs = cast(List[URIRef], list(self.expandedConfig.subjects(RDF.type, ROOM['MqttStatementSource']))) |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
346 srcs.sort(key=str) |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
347 for src in srcs: |
1706
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
348 log.info(f'setup source {src=}') |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
349 try: |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
350 self.srcs.append( |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
351 MqttStatementSource(src, self.topicFromConfig(self.expandedConfig, src), |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
352 masterGraph, mqtt=mqtt, internalMqtt=internalMqtt, |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
353 debugPageData={},#debugPageData, |
1706
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
354 # influxExport=influxExport, |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
355 inference=inference)) |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
356 except EmptyTopicError: |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
357 continue |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
358 log.info(f'set up {len(self.srcs)} sources') |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
359 |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
360 def topicFromConfig(self, config, src) -> bytes: |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
361 topicParts = list(config.items(config.value(src, ROOM['mqttTopic']))) |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
362 return b'/'.join(t.encode('ascii') for t in topicParts) |
1647
34eb87f68ab8
WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents:
1644
diff
changeset
|
363 |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
364 def main(): |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
365 |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
366 logging.getLogger('mqtt').setLevel(logging.DEBUG) |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
367 logging.getLogger('mqtt_client').setLevel(logging.DEBUG) |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
368 logging.getLogger('infer').setLevel(logging.DEBUG) |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
369 logging.getLogger('cbind').setLevel(logging.DEBUG) |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
370 log.setLevel(logging.DEBUG) |
1629
1c36ad1eb8b3
do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents:
1583
diff
changeset
|
371 log.info('log start') |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
372 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
373 masterGraph = PatchableGraph() |
1706
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
374 inference = Inference() |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
375 |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
376 brokerHost = 'mosquitto-frontdoor.default.svc.cluster.local' |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
377 brokerPort = 10210 |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
378 |
1706
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
379 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
380 internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort) |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
381 |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
382 debugPageData = { |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
383 # schema in index.ts |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
384 'server': f'{brokerHost}:{brokerPort}', |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
385 'messagesSeen': 0, |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
386 'subscribed': [], |
1706
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
387 "rules": "", |
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
388 "rulesInferred": "", |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
389 } |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
390 |
1706
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
391 expandedConfigPatchableCopy = PatchableGraph() |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
392 |
1706
2085ed9cfcc4
reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents:
1647
diff
changeset
|
393 runState = RunState(expandedConfigPatchableCopy, masterGraph, mqtt, internalMqtt, inference) |
1642 | 394 |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
395 |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
396 app = Starlette( |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
397 routes=[ |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
398 Route("/", StaticFiles(directory='.'), name='index.html'), |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
399 # Route("/build/(bundle.js)", cyclone.web.StaticFileHandler, {"path": "build"}), |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
400 Route("/graph/config", StaticGraph(expandedConfigPatchableCopy)), |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
401 Route("/graph/mqtt", StaticGraph(masterGraph)), |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
402 Route("/graph/mqtt/events", GraphEvents(masterGraph)), |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
403 # Route('/debugPageData', DebugPageData), |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
404 ]) |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
405 |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
406 app.add_middleware(PrometheusMiddleware, app_name='environment') |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
407 app.add_route("/metrics", handle_metrics) |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
408 return app |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
1706
diff
changeset
|
409 app = main() |