annotate service/mqtt_to_rdf/mqtt_to_rdf.py @ 1726:7d3797ed6681

rough port to starlette and reactivex
author drewp@bigasterisk.com
date Tue, 20 Jun 2023 23:14:28 -0700
parents 2085ed9cfcc4
children 23e6154e6c11
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
1 """
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
2 Subscribe to mqtt topics; generate RDF statements.
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
3 """
1726
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
4 import asyncio
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
5 import json
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
6 import logging
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
7 import time
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
8 from dataclasses import dataclass
1726
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
9 from typing import Callable, List, Set, Tuple, Union, cast
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
10 from mqttrx import MqttClient
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
11 from reactivex import Observable, empty, operators
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
12 import reactivex
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
13 from reactivex.scheduler.eventloop.asyncioscheduler import AsyncIOScheduler
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
14
1726
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
15 from patchablegraph import PatchableGraph
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
16 from patchablegraph.handler import GraphEvents, StaticGraph
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
17 from prometheus_client import Counter
797
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
18 from rdflib import RDF, XSD, Graph, Literal, Namespace, URIRef
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
19 from rdflib.graph import ConjunctiveGraph
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
20 from rdflib.term import Node
1726
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
21 from starlette.applications import Starlette
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
22 from starlette.routing import Route
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
23 from starlette.staticfiles import StaticFiles
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
24 from reactivex.typing import Mapper
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
25 from starlette_exporter import handle_metrics
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
26 from starlette_exporter.middleware import PrometheusMiddleware
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
27
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
28 from button_events import button_events
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
29 from inference import Inference
1726
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
30 from mqtt_message import graphFromMessage
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
31
1726
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
32 log = logging.getLogger()
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
33 ROOM = Namespace('http://projects.bigasterisk.com/room/')
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
34 MESSAGES_SEEN = Counter('mqtt_messages_seen', '')
797
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
35 collectors = {}
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
36
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
37 def logGraph(debug: Callable, label: str, graph: Graph):
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
38 n3 = cast(bytes, graph.serialize(format="n3"))
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
39 debug(label + ':\n' + n3.decode('utf8'))
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
40
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
41
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
42 def appendLimit(lst, elem, n=10):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
43 del lst[:len(lst) - n + 1]
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
44 lst.append(elem)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
45
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
46
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
47 def parseDurationLiteral(lit: Literal) -> float:
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
48 if lit.endswith('s'):
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
49 return float(lit.split('s')[0])
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
50 raise NotImplementedError(f'duration literal: {lit}')
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
51
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
52
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
53 @dataclass
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
54 class StreamPipelineStep:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
55 uri: URIRef # a :MqttStatementSource
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
56 config: Graph
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
57
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
58 def makeOutputStream(self, inStream: Observable) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
59 return inStream
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
60
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
61
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
62 class Filters(StreamPipelineStep):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
63
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
64 def makeOutputStream(self, inStream: Observable) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
65 jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals'])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
66 if jsonEq:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
67 required = json.loads(jsonEq.toPython())
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
68
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
69 def eq(jsonBytes):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
70 msg = json.loads(jsonBytes.decode('utf8'))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
71 return msg == required
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
72
1726
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
73 outStream = operators.filter(eq)(inStream)
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
74 else:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
75 outStream = inStream
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
76 return outStream
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
77
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
78
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
79 class Parser(StreamPipelineStep):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
80
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
81 def makeOutputStream(self, inStream: Observable) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
82 parser = self.getParser()
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
83 return parser(inStream)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
84
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
85 def getParser(self) -> Callable[[Observable], Observable]:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
86 parserType = cast(URIRef, self.config.value(self.uri, ROOM['parser']))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
87 func = self.getParserFunc(parserType)
1726
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
88 return operators.map(cast(Mapper, func))
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
89
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
90 def getParserFunc(self, parserType: URIRef) -> Callable[[bytes], Node]:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
91 if parserType == XSD.double:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
92 return lambda v: Literal(float(v))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
93 elif parserType == ROOM['tagIdToUri']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
94 return self.tagIdToUri
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
95 elif parserType == ROOM['onOffBrightness']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
96 return lambda v: Literal(0.0 if v == b'OFF' else 1.0)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
97 elif parserType == ROOM['jsonBrightness']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
98 return self.parseJsonBrightness
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
99 elif ROOM['ValueMap'] in self.config.objects(parserType, RDF.type):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
100 return lambda v: self.remap(parserType, v.decode('utf8'))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
101 elif parserType == ROOM['rfCode']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
102 return self.parseJsonRfCode
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
103 elif parserType == ROOM['tradfri']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
104 return self.parseTradfriMessage
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
105 else:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
106 raise NotImplementedError(parserType)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
107
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
108 def tagIdToUri(self, value: bytes) -> URIRef:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
109 justHex = value.decode('utf8').replace('-', '').lower()
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
110 int(justHex, 16) # validate
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
111 return URIRef(f'http://bigasterisk.com/rfidCard/{justHex}')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
112
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
113 def parseJsonBrightness(self, mqttValue: bytes):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
114 msg = json.loads(mqttValue.decode('utf8'))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
115 return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
116
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
117 def remap(self, parser, valueStr: str) -> Node:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
118 g = self.config
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
119 value = Literal(valueStr)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
120 for entry in g.objects(parser, ROOM['map']):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
121 if value == g.value(entry, ROOM['from']):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
122 to_ = g.value(entry, ROOM['to'])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
123 if not isinstance(to_, Node):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
124 raise TypeError(f'{to_=}')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
125 return to_
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
126 raise KeyError(value)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
127
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
128 def parseJsonRfCode(self, mqttValue: bytes):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
129 msg = json.loads(mqttValue.decode('utf8'))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
130 return Literal('%08x%08x' % (msg['code0'], msg['code1']))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
131
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
132 def parseTradfriMessage(self, mqttValue: bytes) -> Node:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
133 log.info(f'trad {mqttValue}')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
134 return Literal('todo')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
135
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
136
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
137 class Converters(StreamPipelineStep):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
138
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
139 def makeOutputStream(self, inStream: Observable) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
140 out = inStream
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
141 g = self.config
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
142 for conv in g.items(g.value(self.uri, ROOM['conversions'])):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
143 out = self.conversionStep(conv)(out)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
144 return out
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
145
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
146 def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
147 g = self.config
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
148 if conv == ROOM['celsiusToFarenheit']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
149
1726
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
150 return operators.map(cast(Mapper, self.c2f))
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
151 elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
152 threshold = cast(Literal, g.value(conv, ROOM['ignoreValueBelow'])).toPython()
1726
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
153 return operators.filter(lambda value: cast(Literal, value).toPython() >= threshold)
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
154 elif conv == ROOM['buttonPress']:
1726
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
155 return button_events(min_hold_sec=1.0, release_after_sec=1.0)
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
156 else:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
157 raise NotImplementedError(conv)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
158
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
159 def c2f(self, value: Literal) -> Node:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
160 return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
161
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
162
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
163 class Rdfizer(StreamPipelineStep):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
164
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
165 def makeOutputStream(self, inStream: Observable) -> Observable:
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
166 plans = list(self.config.objects(self.uri, ROOM['graphStatements']))
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
167 log.debug(f'{self.uri=} has {len(plans)=}')
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
168 if not plans:
1726
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
169 return empty()
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
170 outputQuadsSets = reactivex.combine_latest(*[self.makeQuads(inStream, plan) for plan in plans])
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
171 return outputQuadsSets
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
172
1726
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
173 def makeQuads(self, inStream: Observable, plan: Node) -> Observable:
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
174
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
175 def quadsFromValue(valueNode):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
176 return set([(self.uri, self.config.value(plan, ROOM['outputPredicate']), valueNode, self.uri)])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
177
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
178 def emptyQuads(element) -> Set[Tuple]:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
179 return set([])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
180
1726
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
181 quads = operators.map(cast(Mapper, quadsFromValue))(inStream)
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
182
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
183 dur = self.config.value(plan, ROOM['statementLifetime'])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
184 if dur is not None:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
185 sec = parseDurationLiteral(dur)
1726
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
186 loop = AsyncIOScheduler(asyncio.get_event_loop())
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
187 quads = quads.pipe(
1726
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
188 operators.debounce(sec, loop),
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
189 operators.map(cast(Mapper, emptyQuads)),
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
190 operators.merge(quads),
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
191 )
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
192
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
193 return quads
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
194
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
195
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
196 def truncTime():
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
197 return round(time.time(), 3)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
198
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
199
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
200 def tightN3(node: Union[URIRef, Literal]) -> str:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
201 return node.n3().replace('http://www.w3.org/2001/XMLSchema#', 'xsd:')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
202
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
203
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
204 def serializeWithNs(graph: Graph, hidePrefixes=False) -> str:
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
205 graph.bind('', ROOM)
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
206 n3 = cast(bytes, graph.serialize(format='n3')).decode('utf8')
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
207 if hidePrefixes:
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
208 n3 = ''.join(line for line in n3.splitlines(keepends=True) if not line.strip().startswith('@prefix'))
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
209 return n3
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
210
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
211
1643
c04b5303eb08 try not to make empty topic subscriptions
drewp@bigasterisk.com
parents: 1642
diff changeset
212 class EmptyTopicError(ValueError):
c04b5303eb08 try not to make empty topic subscriptions
drewp@bigasterisk.com
parents: 1642
diff changeset
213 pass
c04b5303eb08 try not to make empty topic subscriptions
drewp@bigasterisk.com
parents: 1642
diff changeset
214
c04b5303eb08 try not to make empty topic subscriptions
drewp@bigasterisk.com
parents: 1642
diff changeset
215
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
216 class MqttStatementSource:
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
217
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
218 def __init__(self, uri: URIRef, topic: bytes, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData,
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
219 # influxExport: InfluxExporter,
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
220 inference: Inference):
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
221 self.uri = uri
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
222
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
223 self.masterGraph = masterGraph
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
224 self.debugPageData = debugPageData
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
225 self.mqtt = mqtt # deprecated
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
226 self.internalMqtt = internalMqtt
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
227 # self.influxExport = influxExport
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
228 self.inference = inference
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
229
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
230 self.mqttTopic = topic
1643
c04b5303eb08 try not to make empty topic subscriptions
drewp@bigasterisk.com
parents: 1642
diff changeset
231 if self.mqttTopic == b'':
c04b5303eb08 try not to make empty topic subscriptions
drewp@bigasterisk.com
parents: 1642
diff changeset
232 raise EmptyTopicError(f"empty topic for {uri=}")
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
233 log.debug(f'new mqttTopic {self.mqttTopic}')
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
234
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
235 self.debugSub = {
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
236 'topic': self.mqttTopic.decode('ascii'),
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
237 'recentMessageGraphs': [],
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
238 'recentMetrics': [],
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
239 'currentOutputGraph': {
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
240 't': 1,
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
241 'n3': "(n3)"
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
242 },
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
243 }
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
244 self.debugPageData['subscribed'].append(self.debugSub)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
245
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
246 rawBytes: Observable = self.subscribeMqtt(self.mqttTopic)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
247 rawBytes.subscribe(on_next=self.countIncomingMessage)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
248
1726
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
249 rawBytes.subscribe(self.onMessage)
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
250
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
251 def onMessage(self, raw: bytes):
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
252 g = graphFromMessage(self.uri, self.mqttTopic, raw)
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
253 logGraph(log.debug, 'message graph', g)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
254 appendLimit(
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
255 self.debugSub['recentMessageGraphs'],
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
256 { #
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
257 't': truncTime(),
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
258 'n3': serializeWithNs(g, hidePrefixes=True)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
259 })
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
260
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
261 implied = self.inference.infer(g)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
262 self.updateMasterGraph(implied)
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
263
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
264 def subscribeMqtt(self, topic: bytes):
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
265 # goal is to get everyone on the internal broker and eliminate this
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
266 mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
267 return mqtt.subscribe(topic)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
268
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
269 def countIncomingMessage(self, msg: bytes):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
270 self.debugPageData['messagesSeen'] += 1
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
271 MESSAGES_SEEN.inc()
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
272
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
273 def updateMasterGraph(self, newGraph):
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
274 log.debug(f'{self.uri} update to {len(newGraph)} statements')
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
275
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
276 cg = ConjunctiveGraph()
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
277 for stmt in newGraph:
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
278 cg.add(stmt + (self.uri,))
1644
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
279 # meas = stmt[0].split('/')[-1]
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
280 # if meas.startswith('airQuality'):
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
281 # where_prefix, type_ = meas[len('airQuality'):].split('door')
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
282 # where = where_prefix + 'door'
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
283 # metric = 'air'
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
284 # tags = {'loc': where.lower(), 'type': type_.lower()}
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
285 # val = stmt[2].toPython()
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
286 # if metric not in collectors:
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
287 # collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys())
796
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
288
1644
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
289 # collectors[metric].labels(**tags).set(val)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
290
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
291 self.masterGraph.patchSubgraph(self.uri, cg)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
292 self.debugSub['currentOutputGraph']['n3'] = serializeWithNs(cg, hidePrefixes=True)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
293
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
294
796
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
295
1726
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
296 # class DebugPageData(cyclone.sse.SSEHandler):
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
297
1726
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
298 # def __init__(self, application, request):
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
299 # cyclone.sse.SSEHandler.__init__(self, application, request)
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
300 # self.lastSent = None
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
301
1726
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
302 # def watch(self):
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
303 # try:
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
304 # dpd = self.settings.debugPageData
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
305 # js = json.dumps(dpd, sort_keys=True)
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
306 # if js != self.lastSent:
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
307 # log.debug('sending dpd update')
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
308 # self.sendEvent(message=js.encode('utf8'))
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
309 # self.lastSent = js
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
310 # except Exception:
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
311 # import traceback
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
312 # traceback.print_exc()
1647
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
313
1726
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
314 # def bind(self):
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
315 # self.loop = task.LoopingCall(self.watch)
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
316 # self.loop.start(1, now=True)
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
317
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
318 # def unbind(self):
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
319 # self.loop.stop()
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
320
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
321
1647
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
322
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
323 class RunState:
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
324 """this is rebuilt upon every config reload"""
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
325 def __init__(self,
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
326 expandedConfigPatchableCopy: PatchableGraph, # for output and display
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
327 masterGraph: PatchableGraph, # current sensor outputs
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
328 mqtt: MqttClient,
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
329 internalMqtt: MqttClient,
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
330 # influxExport: InfluxExporter,
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
331 inference: Inference):
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
332 loadedConfig = ConjunctiveGraph()
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
333 loadedConfig.parse('conf/rules.n3', format='n3')
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
334
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
335 inference.setRules(loadedConfig)
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
336 self.expandedConfig = inference.infer(loadedConfig)
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
337 self.expandedConfig += inference.nonRuleStatements()
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
338
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
339 ecWithQuads = ConjunctiveGraph()
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
340 for s, p, o in self.expandedConfig:
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
341 ecWithQuads.add((s, p, o, URIRef('/config')))
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
342 expandedConfigPatchableCopy.setToGraph(ecWithQuads)
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
343
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
344 self.srcs = []
1726
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
345 srcs = cast(List[URIRef], list(self.expandedConfig.subjects(RDF.type, ROOM['MqttStatementSource'])))
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
346 srcs.sort(key=str)
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
347 for src in srcs:
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
348 log.info(f'setup source {src=}')
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
349 try:
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
350 self.srcs.append(
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
351 MqttStatementSource(src, self.topicFromConfig(self.expandedConfig, src),
1726
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
352 masterGraph, mqtt=mqtt, internalMqtt=internalMqtt,
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
353 debugPageData={},#debugPageData,
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
354 # influxExport=influxExport,
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
355 inference=inference))
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
356 except EmptyTopicError:
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
357 continue
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
358 log.info(f'set up {len(self.srcs)} sources')
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
359
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
360 def topicFromConfig(self, config, src) -> bytes:
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
361 topicParts = list(config.items(config.value(src, ROOM['mqttTopic'])))
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
362 return b'/'.join(t.encode('ascii') for t in topicParts)
1647
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
363
1726
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
364 def main():
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
365
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
366 logging.getLogger('mqtt').setLevel(logging.DEBUG)
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
367 logging.getLogger('mqtt_client').setLevel(logging.DEBUG)
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
368 logging.getLogger('infer').setLevel(logging.DEBUG)
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
369 logging.getLogger('cbind').setLevel(logging.DEBUG)
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
370 log.setLevel(logging.DEBUG)
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
371 log.info('log start')
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
372
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
373 masterGraph = PatchableGraph()
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
374 inference = Inference()
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
375
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
376 brokerHost = 'mosquitto-frontdoor.default.svc.cluster.local'
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
377 brokerPort = 10210
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
378
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
379 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
380 internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort)
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
381
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
382 debugPageData = {
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
383 # schema in index.ts
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
384 'server': f'{brokerHost}:{brokerPort}',
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
385 'messagesSeen': 0,
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
386 'subscribed': [],
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
387 "rules": "",
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
388 "rulesInferred": "",
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
389 }
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
390
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
391 expandedConfigPatchableCopy = PatchableGraph()
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
392
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
393 runState = RunState(expandedConfigPatchableCopy, masterGraph, mqtt, internalMqtt, inference)
1642
78024b27f9ec serve graph/config
drewp@bigasterisk.com
parents: 1629
diff changeset
394
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
395
1726
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
396 app = Starlette(
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
397 routes=[
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
398 Route("/", StaticFiles(directory='.'), name='index.html'),
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
399 # Route("/build/(bundle.js)", cyclone.web.StaticFileHandler, {"path": "build"}),
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
400 Route("/graph/config", StaticGraph(expandedConfigPatchableCopy)),
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
401 Route("/graph/mqtt", StaticGraph(masterGraph)),
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
402 Route("/graph/mqtt/events", GraphEvents(masterGraph)),
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
403 # Route('/debugPageData', DebugPageData),
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
404 ])
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
405
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
406 app.add_middleware(PrometheusMiddleware, app_name='environment')
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
407 app.add_route("/metrics", handle_metrics)
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
408 return app
7d3797ed6681 rough port to starlette and reactivex
drewp@bigasterisk.com
parents: 1706
diff changeset
409 app = main()