annotate service/mqtt_to_rdf/mqtt_to_rdf.py @ 1706:2085ed9cfcc4

reworking UI to reflect the new inferencing code
author drewp@bigasterisk.com
date Sat, 23 Oct 2021 13:22:40 -0700
parents 34eb87f68ab8
children 7d3797ed6681
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
1 """
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
2 Subscribe to mqtt topics; generate RDF statements.
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
3 """
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
4 import glob
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
5 import json
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
6 import logging
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
7 import os
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
8
1642
78024b27f9ec serve graph/config
drewp@bigasterisk.com
parents: 1629
diff changeset
9 from rdfdb.patch import Patch
78024b27f9ec serve graph/config
drewp@bigasterisk.com
parents: 1629
diff changeset
10
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
11 from mqtt_message import graphFromMessage
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
12 import os
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
13 import time
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
14 from dataclasses import dataclass
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
15 from pathlib import Path
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
16 from typing import Callable, Set, Tuple, Union, cast
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
17
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
18 import cyclone.sse
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
19 import cyclone.web
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
20 import export_to_influxdb
797
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
21 import prometheus_client
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
22 import rx
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
23 import rx.operators
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
24 import rx.scheduler.eventloop
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
25 from docopt import docopt
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
26 from export_to_influxdb import InfluxExporter
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
27 from mqtt_client import MqttClient
797
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
28 from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph)
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
29 from prometheus_client import Counter, Gauge, Histogram, Summary
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
30 from prometheus_client.exposition import generate_latest
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
31 from prometheus_client.registry import REGISTRY
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
32 from rdflib import RDF, XSD, Graph, Literal, Namespace, URIRef
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
33 from rdflib.graph import ConjunctiveGraph
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
34 from rdflib.term import Node
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
35 from rx.core.observable.observable import Observable
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
36 from rx.core.typing import Mapper
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
37 from standardservice.logsetup import log, verboseLogging
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
38 from twisted.internet import reactor, task
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
39 from inference import Inference
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
40 from button_events import button_events
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
41 from patch_cyclone_sse import patchCycloneSse
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
42
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
43 ROOM = Namespace('http://projects.bigasterisk.com/room/')
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
44 MESSAGES_SEEN = Counter('mqtt_messages_seen', '')
797
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
45 collectors = {}
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
46
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
47 patchCycloneSse()
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
48
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
49
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
50 def logGraph(debug: Callable, label: str, graph: Graph):
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
51 n3 = cast(bytes, graph.serialize(format="n3"))
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
52 debug(label + ':\n' + n3.decode('utf8'))
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
53
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
54
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
55 def appendLimit(lst, elem, n=10):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
56 del lst[:len(lst) - n + 1]
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
57 lst.append(elem)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
58
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
59
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
60 def parseDurationLiteral(lit: Literal) -> float:
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
61 if lit.endswith('s'):
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
62 return float(lit.split('s')[0])
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
63 raise NotImplementedError(f'duration literal: {lit}')
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
64
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
65
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
66 @dataclass
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
67 class StreamPipelineStep:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
68 uri: URIRef # a :MqttStatementSource
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
69 config: Graph
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
70
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
71 def makeOutputStream(self, inStream: Observable) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
72 return inStream
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
73
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
74
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
75 class Filters(StreamPipelineStep):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
76
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
77 def makeOutputStream(self, inStream: Observable) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
78 jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals'])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
79 if jsonEq:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
80 required = json.loads(jsonEq.toPython())
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
81
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
82 def eq(jsonBytes):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
83 msg = json.loads(jsonBytes.decode('utf8'))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
84 return msg == required
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
85
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
86 outStream = rx.operators.filter(eq)(inStream)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
87 else:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
88 outStream = inStream
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
89 return outStream
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
90
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
91
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
92 class Parser(StreamPipelineStep):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
93
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
94 def makeOutputStream(self, inStream: Observable) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
95 parser = self.getParser()
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
96 return parser(inStream)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
97
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
98 def getParser(self) -> Callable[[Observable], Observable]:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
99 parserType = cast(URIRef, self.config.value(self.uri, ROOM['parser']))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
100 func = self.getParserFunc(parserType)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
101 return rx.operators.map(cast(Mapper, func))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
102
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
103 def getParserFunc(self, parserType: URIRef) -> Callable[[bytes], Node]:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
104 if parserType == XSD.double:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
105 return lambda v: Literal(float(v))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
106 elif parserType == ROOM['tagIdToUri']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
107 return self.tagIdToUri
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
108 elif parserType == ROOM['onOffBrightness']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
109 return lambda v: Literal(0.0 if v == b'OFF' else 1.0)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
110 elif parserType == ROOM['jsonBrightness']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
111 return self.parseJsonBrightness
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
112 elif ROOM['ValueMap'] in self.config.objects(parserType, RDF.type):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
113 return lambda v: self.remap(parserType, v.decode('utf8'))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
114 elif parserType == ROOM['rfCode']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
115 return self.parseJsonRfCode
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
116 elif parserType == ROOM['tradfri']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
117 return self.parseTradfriMessage
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
118 else:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
119 raise NotImplementedError(parserType)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
120
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
121 def tagIdToUri(self, value: bytes) -> URIRef:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
122 justHex = value.decode('utf8').replace('-', '').lower()
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
123 int(justHex, 16) # validate
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
124 return URIRef(f'http://bigasterisk.com/rfidCard/{justHex}')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
125
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
126 def parseJsonBrightness(self, mqttValue: bytes):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
127 msg = json.loads(mqttValue.decode('utf8'))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
128 return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
129
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
130 def remap(self, parser, valueStr: str) -> Node:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
131 g = self.config
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
132 value = Literal(valueStr)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
133 for entry in g.objects(parser, ROOM['map']):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
134 if value == g.value(entry, ROOM['from']):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
135 to_ = g.value(entry, ROOM['to'])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
136 if not isinstance(to_, Node):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
137 raise TypeError(f'{to_=}')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
138 return to_
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
139 raise KeyError(value)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
140
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
141 def parseJsonRfCode(self, mqttValue: bytes):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
142 msg = json.loads(mqttValue.decode('utf8'))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
143 return Literal('%08x%08x' % (msg['code0'], msg['code1']))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
144
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
145 def parseTradfriMessage(self, mqttValue: bytes) -> Node:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
146 log.info(f'trad {mqttValue}')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
147 return Literal('todo')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
148
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
149
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
150 class Converters(StreamPipelineStep):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
151
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
152 def makeOutputStream(self, inStream: Observable) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
153 out = inStream
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
154 g = self.config
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
155 for conv in g.items(g.value(self.uri, ROOM['conversions'])):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
156 out = self.conversionStep(conv)(out)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
157 return out
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
158
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
159 def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
160 g = self.config
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
161 if conv == ROOM['celsiusToFarenheit']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
162
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
163 return rx.operators.map(cast(Mapper, self.c2f))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
164 elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
165 threshold = cast(Literal, g.value(conv, ROOM['ignoreValueBelow'])).toPython()
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
166 return rx.operators.filter(lambda value: cast(Literal, value).toPython() >= threshold)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
167 elif conv == ROOM['buttonPress']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
168 loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
169 return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
170 else:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
171 raise NotImplementedError(conv)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
172
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
173 def c2f(self, value: Literal) -> Node:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
174 return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
175
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
176
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
177 class Rdfizer(StreamPipelineStep):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
178
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
179 def makeOutputStream(self, inStream: Observable) -> Observable:
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
180 plans = list(self.config.objects(self.uri, ROOM['graphStatements']))
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
181 log.debug(f'{self.uri=} has {len(plans)=}')
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
182 if not plans:
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
183 return rx.empty()
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
184 outputQuadsSets = rx.combine_latest(*[self.makeQuads(inStream, plan) for plan in plans])
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
185 return outputQuadsSets
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
186
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
187 def makeQuads(self, inStream: Observable, plan: URIRef) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
188
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
189 def quadsFromValue(valueNode):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
190 return set([(self.uri, self.config.value(plan, ROOM['outputPredicate']), valueNode, self.uri)])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
191
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
192 def emptyQuads(element) -> Set[Tuple]:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
193 return set([])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
194
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
195 quads = rx.operators.map(cast(Mapper, quadsFromValue))(inStream)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
196
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
197 dur = self.config.value(plan, ROOM['statementLifetime'])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
198 if dur is not None:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
199 sec = parseDurationLiteral(dur)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
200 loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
201 quads = quads.pipe(
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
202 rx.operators.debounce(sec, loop),
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
203 rx.operators.map(cast(Mapper, emptyQuads)),
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
204 rx.operators.merge(quads),
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
205 )
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
206
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
207 return quads
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
208
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
209
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
210 def truncTime():
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
211 return round(time.time(), 3)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
212
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
213
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
214 def tightN3(node: Union[URIRef, Literal]) -> str:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
215 return node.n3().replace('http://www.w3.org/2001/XMLSchema#', 'xsd:')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
216
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
217
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
218 def serializeWithNs(graph: Graph, hidePrefixes=False) -> str:
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
219 graph.bind('', ROOM)
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
220 n3 = cast(bytes, graph.serialize(format='n3')).decode('utf8')
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
221 if hidePrefixes:
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
222 n3 = ''.join(line for line in n3.splitlines(keepends=True) if not line.strip().startswith('@prefix'))
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
223 return n3
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
224
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
225
1643
c04b5303eb08 try not to make empty topic subscriptions
drewp@bigasterisk.com
parents: 1642
diff changeset
226 class EmptyTopicError(ValueError):
c04b5303eb08 try not to make empty topic subscriptions
drewp@bigasterisk.com
parents: 1642
diff changeset
227 pass
c04b5303eb08 try not to make empty topic subscriptions
drewp@bigasterisk.com
parents: 1642
diff changeset
228
c04b5303eb08 try not to make empty topic subscriptions
drewp@bigasterisk.com
parents: 1642
diff changeset
229
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
230 class MqttStatementSource:
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
231
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
232 def __init__(self, uri: URIRef, topic: bytes, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData,
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
233 # influxExport: InfluxExporter,
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
234 inference: Inference):
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
235 self.uri = uri
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
236
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
237 self.masterGraph = masterGraph
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
238 self.debugPageData = debugPageData
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
239 self.mqtt = mqtt # deprecated
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
240 self.internalMqtt = internalMqtt
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
241 # self.influxExport = influxExport
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
242 self.inference = inference
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
243
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
244 self.mqttTopic = topic
1643
c04b5303eb08 try not to make empty topic subscriptions
drewp@bigasterisk.com
parents: 1642
diff changeset
245 if self.mqttTopic == b'':
c04b5303eb08 try not to make empty topic subscriptions
drewp@bigasterisk.com
parents: 1642
diff changeset
246 raise EmptyTopicError(f"empty topic for {uri=}")
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
247 log.debug(f'new mqttTopic {self.mqttTopic}')
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
248
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
249 self.debugSub = {
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
250 'topic': self.mqttTopic.decode('ascii'),
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
251 'recentMessageGraphs': [],
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
252 'recentMetrics': [],
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
253 'currentOutputGraph': {
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
254 't': 1,
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
255 'n3': "(n3)"
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
256 },
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
257 }
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
258 self.debugPageData['subscribed'].append(self.debugSub)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
259
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
260 rawBytes: Observable = self.subscribeMqtt(self.mqttTopic)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
261 rawBytes.subscribe(on_next=self.countIncomingMessage)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
262
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
263 rawBytes.subscribe_(self.onMessage)
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
264
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
265 def onMessage(self, raw: bytes):
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
266 g = graphFromMessage(self.uri, self.mqttTopic, raw)
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
267 logGraph(log.debug, 'message graph', g)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
268 appendLimit(
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
269 self.debugSub['recentMessageGraphs'],
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
270 { #
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
271 't': truncTime(),
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
272 'n3': serializeWithNs(g, hidePrefixes=True)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
273 })
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
274
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
275 implied = self.inference.infer(g)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
276 self.updateMasterGraph(implied)
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
277
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
278 def subscribeMqtt(self, topic: bytes):
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
279 # goal is to get everyone on the internal broker and eliminate this
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
280 mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
281 return mqtt.subscribe(topic)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
282
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
283 def countIncomingMessage(self, msg: bytes):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
284 self.debugPageData['messagesSeen'] += 1
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
285 MESSAGES_SEEN.inc()
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
286
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
287 def updateInflux(self, newGraphs):
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
288 for g in newGraphs:
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
289 self.influxExport.exportToInflux(g)
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
290
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
291 def updateMasterGraph(self, newGraph):
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
292 log.debug(f'{self.uri} update to {len(newGraph)} statements')
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
293
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
294 cg = ConjunctiveGraph()
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
295 for stmt in newGraph:
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
296 cg.add(stmt + (self.uri,))
1644
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
297 # meas = stmt[0].split('/')[-1]
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
298 # if meas.startswith('airQuality'):
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
299 # where_prefix, type_ = meas[len('airQuality'):].split('door')
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
300 # where = where_prefix + 'door'
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
301 # metric = 'air'
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
302 # tags = {'loc': where.lower(), 'type': type_.lower()}
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
303 # val = stmt[2].toPython()
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
304 # if metric not in collectors:
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
305 # collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys())
796
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
306
1644
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
307 # collectors[metric].labels(**tags).set(val)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
308
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
309 self.masterGraph.patchSubgraph(self.uri, cg)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
310 self.debugSub['currentOutputGraph']['n3'] = serializeWithNs(cg, hidePrefixes=True)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
311
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
312
796
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
313 class Metrics(cyclone.web.RequestHandler):
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
314
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
315 def get(self):
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
316 self.add_header('content-type', 'text/plain')
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
317 self.write(generate_latest(REGISTRY))
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
318
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
319
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
320 class DebugPageData(cyclone.sse.SSEHandler):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
321
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
322 def __init__(self, application, request):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
323 cyclone.sse.SSEHandler.__init__(self, application, request)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
324 self.lastSent = None
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
325
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
326 def watch(self):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
327 try:
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
328 dpd = self.settings.debugPageData
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
329 js = json.dumps(dpd, sort_keys=True)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
330 if js != self.lastSent:
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
331 log.debug('sending dpd update')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
332 self.sendEvent(message=js.encode('utf8'))
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
333 self.lastSent = js
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
334 except Exception:
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
335 import traceback
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
336 traceback.print_exc()
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
337
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
338 def bind(self):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
339 self.loop = task.LoopingCall(self.watch)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
340 self.loop.start(1, now=True)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
341
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
342 def unbind(self):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
343 self.loop.stop()
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
344
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
345
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
346 # @dataclass
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
347 # class WatchFiles:
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
348 # # could be merged with rdfdb.service and GraphFile code
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
349 # globPattern: str
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
350 # outGraph: Graph
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
351
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
352 # def __post_init__(self):
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
353 # self.lastUpdate = 0
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
354 # task.LoopingCall(self.refresh).start(1)
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
355 # log.info(f'start watching {self.globPattern}')
1647
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
356
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
357 # def refresh(self):
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
358 # files = glob.glob(self.globPattern)
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
359 # for fn in files:
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
360 # if os.path.getmtime(fn) > self.lastUpdate:
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
361 # break
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
362 # else:
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
363 # return
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
364 # self.lastUpdate = time.time()
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
365 # self.outGraph.remove((None, None, None))
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
366 # log.info('reread config')
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
367 # for fn in files:
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
368 # # todo: handle read errors
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
369 # self.outGraph.parse(fn, format='n3')
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
370 # # and notify this change,so we can recalc the latest output
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
371
1647
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
372
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
373 class RunState:
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
374 """this is rebuilt upon every config reload"""
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
375 def __init__(self,
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
376 expandedConfigPatchableCopy: PatchableGraph, # for output and display
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
377 masterGraph: PatchableGraph, # current sensor outputs
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
378 mqtt: MqttClient,
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
379 internalMqtt: MqttClient,
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
380 # influxExport: InfluxExporter,
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
381 inference: Inference):
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
382 loadedConfig = ConjunctiveGraph()
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
383 loadedConfig.parse('conf/rules.n3', format='n3')
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
384
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
385 inference.setRules(loadedConfig)
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
386 self.expandedConfig = inference.infer(loadedConfig)
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
387 self.expandedConfig += inference.nonRuleStatements()
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
388
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
389 ecWithQuads = ConjunctiveGraph()
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
390 for s, p, o in self.expandedConfig:
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
391 ecWithQuads.add((s, p, o, URIRef('/config')))
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
392 expandedConfigPatchableCopy.setToGraph(ecWithQuads)
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
393
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
394 self.srcs = []
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
395 for src in sorted(self.expandedConfig.subjects(RDF.type, ROOM['MqttStatementSource'])):
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
396 log.info(f'setup source {src=}')
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
397 try:
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
398 self.srcs.append(
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
399 MqttStatementSource(src, self.topicFromConfig(self.expandedConfig, src),
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
400 masterGraph, mqtt=mqtt, internalMqtt=internalMqtt, debugPageData=debugPageData,
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
401 # influxExport=influxExport,
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
402 inference=inference))
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
403 except EmptyTopicError:
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
404 continue
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
405 log.info(f'set up {len(self.srcs)} sources')
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
406
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
407 def topicFromConfig(self, config, src) -> bytes:
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
408 topicParts = list(config.items(config.value(src, ROOM['mqttTopic'])))
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
409 return b'/'.join(t.encode('ascii') for t in topicParts)
1647
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
410
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
411
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
412 if __name__ == '__main__':
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
413 arg = docopt("""
733
9ca69f2be87b more mqtt_to_rdf renames. bring in basic LitElement setup for the debug page
drewp@bigasterisk.com
parents: 732
diff changeset
414 Usage: mqtt_to_rdf.py [options]
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
415
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
416 -v Verbose
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
417 --cs=STR Only process config filenames with this substring
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
418 """)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
419 verboseLogging(arg['-v'])
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
420 logging.getLogger('mqtt').setLevel(logging.INFO)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
421 logging.getLogger('mqtt_client').setLevel(logging.INFO)
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
422 logging.getLogger('infer').setLevel(logging.INFO)
1647
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
423 logging.getLogger('cbind').setLevel(logging.INFO)
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
424 # log.setLevel(logging.DEBUG)
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
425 log.info('log start')
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
426
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
427 # config = ConjunctiveGraph()
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
428 # watcher = WatchFiles('conf/rules.n3', config)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
429 masterGraph = PatchableGraph()
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
430 inference = Inference()
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
431
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
432 brokerHost = 'mosquitto-frontdoor.default.svc.cluster.local'
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
433 brokerPort = 10210
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
434
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
435 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
436 internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort)
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
437
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
438 # needs rework since the config can change:
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
439 # influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST'])
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
440
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
441 debugPageData = {
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
442 # schema in index.ts
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
443 'server': f'{brokerHost}:{brokerPort}',
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
444 'messagesSeen': 0,
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
445 'subscribed': [],
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
446 "rules": "",
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
447 "rulesInferred": "",
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
448 }
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
449
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
450 expandedConfigPatchableCopy = PatchableGraph()
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
451
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
452 runState = RunState(expandedConfigPatchableCopy, masterGraph, mqtt, internalMqtt, inference)
1642
78024b27f9ec serve graph/config
drewp@bigasterisk.com
parents: 1629
diff changeset
453
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
454 port = 10018
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
455 reactor.listenTCP(port,
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
456 cyclone.web.Application([
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
457 (r"/()", cyclone.web.StaticFileHandler, {"path": ".", "default_filename": "index.html"}),
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
458 (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, {"path": "build"}),
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
459 (r"/graph/config", CycloneGraphHandler, {'masterGraph': expandedConfigPatchableCopy}),
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
460 (r"/graph/mqtt", CycloneGraphHandler, {'masterGraph': masterGraph}),
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
461 (r"/graph/mqtt/events", CycloneGraphEventsHandler, {'masterGraph': masterGraph}),
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
462 (r'/debugPageData', DebugPageData),
796
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
463 (r'/metrics', Metrics),
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
464 ],
1706
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
465 # mqtt=mqtt,
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
466 # internalMqtt=internalMqtt,
2085ed9cfcc4 reworking UI to reflect the new inferencing code
drewp@bigasterisk.com
parents: 1647
diff changeset
467 # masterGraph=masterGraph,
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
468 debugPageData=debugPageData,
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
469 debug=arg['-v']),
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
470 interface='::')
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
471 log.info('serving on %s', port)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
472
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
473 reactor.run()