annotate service/mqtt_to_rdf/mqtt_to_rdf.py @ 1695:5c2565e63297

take out some misduided fixes
author drewp@bigasterisk.com
date Mon, 27 Sep 2021 22:55:32 -0700
parents 34eb87f68ab8
children 2085ed9cfcc4
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
1 """
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
2 Subscribe to mqtt topics; generate RDF statements.
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
3 """
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
4 import glob
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
5 import json
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
6 import logging
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
7 import os
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
8
1642
78024b27f9ec serve graph/config
drewp@bigasterisk.com
parents: 1629
diff changeset
9 from rdfdb.patch import Patch
78024b27f9ec serve graph/config
drewp@bigasterisk.com
parents: 1629
diff changeset
10
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
11 from mqtt_message import graphFromMessage
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
12 import os
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
13 import time
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
14 from dataclasses import dataclass
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
15 from pathlib import Path
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
16 from typing import Callable, Sequence, Set, Tuple, Union, cast
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
17
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
18 import cyclone.sse
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
19 import cyclone.web
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
20 import export_to_influxdb
797
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
21 import prometheus_client
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
22 import rx
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
23 import rx.operators
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
24 import rx.scheduler.eventloop
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
25 from docopt import docopt
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
26 from export_to_influxdb import InfluxExporter
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
27 from mqtt_client import MqttClient
797
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
28 from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph)
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
29 from prometheus_client import Counter, Gauge, Histogram, Summary
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
30 from prometheus_client.exposition import generate_latest
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
31 from prometheus_client.registry import REGISTRY
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
32 from rdfdb.rdflibpatch import graphFromQuads
797
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
33 from rdflib import RDF, XSD, Graph, Literal, Namespace, URIRef
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
34 from rdflib.graph import ConjunctiveGraph
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
35 from rdflib.term import Node
791
8f4e814eb1ab cleanup
drewp@bigasterisk.com
parents: 787
diff changeset
36 from rx.core import Observable
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
37 from rx.core.typing import Mapper
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
38 from standardservice.logsetup import log, verboseLogging
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
39 from twisted.internet import reactor, task
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
40 from inference import Inference
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
41 from button_events import button_events
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
42 from patch_cyclone_sse import patchCycloneSse
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
43
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
44 ROOM = Namespace('http://projects.bigasterisk.com/room/')
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
45 MESSAGES_SEEN = Counter('mqtt_messages_seen', '')
797
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
46 collectors = {}
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
47
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
48 patchCycloneSse()
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
49
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
50
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
51 def logGraph(debug: Callable, label: str, graph: Graph):
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
52 n3 = cast(bytes, graph.serialize(format="n3"))
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
53 debug(label + ':\n' + n3.decode('utf8'))
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
54
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
55
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
56 def appendLimit(lst, elem, n=10):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
57 del lst[:len(lst) - n + 1]
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
58 lst.append(elem)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
59
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
60
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
61 def parseDurationLiteral(lit: Literal) -> float:
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
62 if lit.endswith('s'):
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
63 return float(lit.split('s')[0])
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
64 raise NotImplementedError(f'duration literal: {lit}')
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
65
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
66
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
67 @dataclass
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
68 class StreamPipelineStep:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
69 uri: URIRef # a :MqttStatementSource
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
70 config: Graph
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
71
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
72 def makeOutputStream(self, inStream: Observable) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
73 return inStream
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
74
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
75
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
76 class Filters(StreamPipelineStep):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
77
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
78 def makeOutputStream(self, inStream: Observable) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
79 jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals'])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
80 if jsonEq:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
81 required = json.loads(jsonEq.toPython())
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
82
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
83 def eq(jsonBytes):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
84 msg = json.loads(jsonBytes.decode('utf8'))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
85 return msg == required
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
86
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
87 outStream = rx.operators.filter(eq)(inStream)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
88 else:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
89 outStream = inStream
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
90 return outStream
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
91
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
92
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
93 class Parser(StreamPipelineStep):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
94
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
95 def makeOutputStream(self, inStream: Observable) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
96 parser = self.getParser()
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
97 return parser(inStream)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
98
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
99 def getParser(self) -> Callable[[Observable], Observable]:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
100 parserType = cast(URIRef, self.config.value(self.uri, ROOM['parser']))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
101 func = self.getParserFunc(parserType)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
102 return rx.operators.map(cast(Mapper, func))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
103
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
104 def getParserFunc(self, parserType: URIRef) -> Callable[[bytes], Node]:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
105 if parserType == XSD.double:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
106 return lambda v: Literal(float(v))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
107 elif parserType == ROOM['tagIdToUri']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
108 return self.tagIdToUri
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
109 elif parserType == ROOM['onOffBrightness']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
110 return lambda v: Literal(0.0 if v == b'OFF' else 1.0)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
111 elif parserType == ROOM['jsonBrightness']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
112 return self.parseJsonBrightness
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
113 elif ROOM['ValueMap'] in self.config.objects(parserType, RDF.type):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
114 return lambda v: self.remap(parserType, v.decode('utf8'))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
115 elif parserType == ROOM['rfCode']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
116 return self.parseJsonRfCode
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
117 elif parserType == ROOM['tradfri']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
118 return self.parseTradfriMessage
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
119 else:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
120 raise NotImplementedError(parserType)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
121
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
122 def tagIdToUri(self, value: bytes) -> URIRef:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
123 justHex = value.decode('utf8').replace('-', '').lower()
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
124 int(justHex, 16) # validate
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
125 return URIRef(f'http://bigasterisk.com/rfidCard/{justHex}')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
126
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
127 def parseJsonBrightness(self, mqttValue: bytes):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
128 msg = json.loads(mqttValue.decode('utf8'))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
129 return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
130
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
131 def remap(self, parser, valueStr: str) -> Node:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
132 g = self.config
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
133 value = Literal(valueStr)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
134 for entry in g.objects(parser, ROOM['map']):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
135 if value == g.value(entry, ROOM['from']):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
136 to_ = g.value(entry, ROOM['to'])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
137 if not isinstance(to_, Node):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
138 raise TypeError(f'{to_=}')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
139 return to_
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
140 raise KeyError(value)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
141
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
142 def parseJsonRfCode(self, mqttValue: bytes):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
143 msg = json.loads(mqttValue.decode('utf8'))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
144 return Literal('%08x%08x' % (msg['code0'], msg['code1']))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
145
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
146 def parseTradfriMessage(self, mqttValue: bytes) -> Node:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
147 log.info(f'trad {mqttValue}')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
148 return Literal('todo')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
149
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
150
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
151 class Converters(StreamPipelineStep):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
152
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
153 def makeOutputStream(self, inStream: Observable) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
154 out = inStream
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
155 g = self.config
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
156 for conv in g.items(g.value(self.uri, ROOM['conversions'])):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
157 out = self.conversionStep(conv)(out)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
158 return out
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
159
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
160 def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
161 g = self.config
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
162 if conv == ROOM['celsiusToFarenheit']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
163
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
164 return rx.operators.map(cast(Mapper, self.c2f))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
165 elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
166 threshold = cast(Literal, g.value(conv, ROOM['ignoreValueBelow'])).toPython()
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
167 return rx.operators.filter(lambda value: cast(Literal, value).toPython() >= threshold)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
168 elif conv == ROOM['buttonPress']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
169 loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
170 return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
171 else:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
172 raise NotImplementedError(conv)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
173
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
174 def c2f(self, value: Literal) -> Node:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
175 return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
176
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
177
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
178 class Rdfizer(StreamPipelineStep):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
179
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
180 def makeOutputStream(self, inStream: Observable) -> Observable:
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
181 plans = list(self.config.objects(self.uri, ROOM['graphStatements']))
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
182 log.debug(f'{self.uri=} has {len(plans)=}')
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
183 if not plans:
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
184 return rx.empty()
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
185 outputQuadsSets = rx.combine_latest(*[self.makeQuads(inStream, plan) for plan in plans])
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
186 return outputQuadsSets
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
187
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
188 def makeQuads(self, inStream: Observable, plan: URIRef) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
189
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
190 def quadsFromValue(valueNode):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
191 return set([(self.uri, self.config.value(plan, ROOM['outputPredicate']), valueNode, self.uri)])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
192
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
193 def emptyQuads(element) -> Set[Tuple]:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
194 return set([])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
195
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
196 quads = rx.operators.map(cast(Mapper, quadsFromValue))(inStream)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
197
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
198 dur = self.config.value(plan, ROOM['statementLifetime'])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
199 if dur is not None:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
200 sec = parseDurationLiteral(dur)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
201 loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
202 quads = quads.pipe(
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
203 rx.operators.debounce(sec, loop),
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
204 rx.operators.map(cast(Mapper, emptyQuads)),
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
205 rx.operators.merge(quads),
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
206 )
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
207
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
208 return quads
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
209
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
210
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
211 def truncTime():
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
212 return round(time.time(), 3)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
213
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
214
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
215 def tightN3(node: Union[URIRef, Literal]) -> str:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
216 return node.n3().replace('http://www.w3.org/2001/XMLSchema#', 'xsd:')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
217
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
218
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
219 def serializeWithNs(graph: Graph, hidePrefixes=False) -> str:
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
220 graph.bind('', ROOM)
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
221 n3 = cast(bytes, graph.serialize(format='n3')).decode('utf8')
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
222 if hidePrefixes:
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
223 n3 = ''.join(line for line in n3.splitlines(keepends=True) if not line.strip().startswith('@prefix'))
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
224 return n3
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
225
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
226
1643
c04b5303eb08 try not to make empty topic subscriptions
drewp@bigasterisk.com
parents: 1642
diff changeset
227 class EmptyTopicError(ValueError):
c04b5303eb08 try not to make empty topic subscriptions
drewp@bigasterisk.com
parents: 1642
diff changeset
228 pass
c04b5303eb08 try not to make empty topic subscriptions
drewp@bigasterisk.com
parents: 1642
diff changeset
229
c04b5303eb08 try not to make empty topic subscriptions
drewp@bigasterisk.com
parents: 1642
diff changeset
230
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
231 class MqttStatementSource:
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
232
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
233 def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData,
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
234 influxExport: InfluxExporter, inference: Inference):
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
235 self.uri = uri
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
236 self.config = config
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
237 self.masterGraph = masterGraph
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
238 self.debugPageData = debugPageData
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
239 self.mqtt = mqtt # deprecated
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
240 self.internalMqtt = internalMqtt
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
241 self.influxExport = influxExport
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
242 self.inference = inference
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
243
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
244 self.mqttTopic = self.topicFromConfig(self.config)
1643
c04b5303eb08 try not to make empty topic subscriptions
drewp@bigasterisk.com
parents: 1642
diff changeset
245 if self.mqttTopic == b'':
c04b5303eb08 try not to make empty topic subscriptions
drewp@bigasterisk.com
parents: 1642
diff changeset
246 raise EmptyTopicError(f"empty topic for {uri=}")
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
247 log.debug(f'new mqttTopic {self.mqttTopic}')
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
248
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
249 self.debugSub = {
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
250 'topic': self.mqttTopic.decode('ascii'),
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
251 'recentMessageGraphs': [],
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
252 'recentMetrics': [],
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
253 'currentOutputGraph': {
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
254 't': 1,
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
255 'n3': "(n3)"
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
256 },
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
257 }
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
258 self.debugPageData['subscribed'].append(self.debugSub)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
259
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
260 rawBytes: Observable = self.subscribeMqtt(self.mqttTopic)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
261 rawBytes.subscribe(on_next=self.countIncomingMessage)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
262
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
263 rawBytes.subscribe_(self.onMessage)
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
264
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
265 def onMessage(self, raw: bytes):
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
266 g = graphFromMessage(self.mqttTopic, raw)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
267 logGraph(log.debug, 'message graph', g)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
268 appendLimit(
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
269 self.debugSub['recentMessageGraphs'],
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
270 { #
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
271 't': truncTime(),
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
272 'n3': serializeWithNs(g, hidePrefixes=True)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
273 })
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
274
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
275 implied = self.inference.infer(g)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
276 self.updateMasterGraph(implied)
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
277
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
278 def topicFromConfig(self, config) -> bytes:
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
279 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic'])))
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
280 return b'/'.join(t.encode('ascii') for t in topicParts)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
281
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
282 def subscribeMqtt(self, topic: bytes):
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
283 # goal is to get everyone on the internal broker and eliminate this
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
284 mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
285 return mqtt.subscribe(topic)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
286
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
287 def countIncomingMessage(self, msg: bytes):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
288 self.debugPageData['messagesSeen'] += 1
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
289 MESSAGES_SEEN.inc()
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
290
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
291 def updateInflux(self, newGraphs):
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
292 for g in newGraphs:
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
293 self.influxExport.exportToInflux(g)
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
294
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
295 def updateMasterGraph(self, newGraph):
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
296 log.debug(f'{self.uri} update to {len(newGraph)} statements')
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
297
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
298 cg = ConjunctiveGraph()
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
299 for stmt in newGraph:
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
300 cg.add(stmt + (self.uri,))
1644
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
301 # meas = stmt[0].split('/')[-1]
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
302 # if meas.startswith('airQuality'):
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
303 # where_prefix, type_ = meas[len('airQuality'):].split('door')
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
304 # where = where_prefix + 'door'
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
305 # metric = 'air'
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
306 # tags = {'loc': where.lower(), 'type': type_.lower()}
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
307 # val = stmt[2].toPython()
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
308 # if metric not in collectors:
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
309 # collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys())
796
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
310
1644
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
311 # collectors[metric].labels(**tags).set(val)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
312
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
313 self.masterGraph.patchSubgraph(self.uri, cg)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
314 self.debugSub['currentOutputGraph']['n3'] = serializeWithNs(cg, hidePrefixes=True)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
315
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
316
796
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
317 class Metrics(cyclone.web.RequestHandler):
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
318
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
319 def get(self):
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
320 self.add_header('content-type', 'text/plain')
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
321 self.write(generate_latest(REGISTRY))
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
322
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
323
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
324 class DebugPageData(cyclone.sse.SSEHandler):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
325
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
326 def __init__(self, application, request):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
327 cyclone.sse.SSEHandler.__init__(self, application, request)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
328 self.lastSent = None
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
329
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
330 def watch(self):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
331 try:
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
332 dpd = self.settings.debugPageData
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
333 js = json.dumps(dpd, sort_keys=True)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
334 if js != self.lastSent:
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
335 log.debug('sending dpd update')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
336 self.sendEvent(message=js.encode('utf8'))
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
337 self.lastSent = js
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
338 except Exception:
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
339 import traceback
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
340 traceback.print_exc()
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
341
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
342 def bind(self):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
343 self.loop = task.LoopingCall(self.watch)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
344 self.loop.start(1, now=True)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
345
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
346 def unbind(self):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
347 self.loop.stop()
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
348
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
349
1647
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
350 @dataclass
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
351 class WatchFiles:
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
352 # could be merged with rdfdb.service and GraphFile code
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
353 globPattern: str
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
354 outGraph: Graph
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
355
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
356 def __post_init__(self):
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
357 self.lastUpdate = 0
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
358 task.LoopingCall(self.refresh).start(1)
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
359 log.info(f'start watching {self.globPattern}')
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
360
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
361 def refresh(self):
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
362 files = glob.glob(self.globPattern)
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
363 for fn in files:
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
364 if os.path.getmtime(fn) > self.lastUpdate:
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
365 break
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
366 else:
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
367 return
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
368 self.lastUpdate = time.time()
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
369 self.outGraph.remove((None, None, None))
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
370 log.info('reread config')
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
371 for fn in files:
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
372 # todo: handle read errors
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
373 self.outGraph.parse(fn, format='n3')
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
374 # and notify this change,so we can recalc the latest output
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
375
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
376
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
377 if __name__ == '__main__':
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
378 arg = docopt("""
733
9ca69f2be87b more mqtt_to_rdf renames. bring in basic LitElement setup for the debug page
drewp@bigasterisk.com
parents: 732
diff changeset
379 Usage: mqtt_to_rdf.py [options]
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
380
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
381 -v Verbose
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
382 --cs=STR Only process config filenames with this substring
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
383 """)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
384 verboseLogging(arg['-v'])
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
385 logging.getLogger('mqtt').setLevel(logging.INFO)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
386 logging.getLogger('mqtt_client').setLevel(logging.INFO)
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
387 logging.getLogger('infer').setLevel(logging.INFO)
1647
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
388 logging.getLogger('cbind').setLevel(logging.INFO)
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
389 # log.setLevel(logging.DEBUG)
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
390 log.info('log start')
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
391
1647
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
392 config = ConjunctiveGraph()
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
393 watcher = WatchFiles('conf/rules.n3', config)
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
394 # for fn in Path('.').glob('conf/*.n3'):
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
395 # if not arg['--cs'] or str(arg['--cs']) in str(fn):
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
396 # log.debug(f'loading {fn}')
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
397 # config.parse(str(fn), format='n3')
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
398 # else:
34eb87f68ab8 WIP rules reloader (doesn't reload yet)
drewp@bigasterisk.com
parents: 1644
diff changeset
399 # log.debug(f'skipping {fn}')
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
400
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
401 masterGraph = PatchableGraph()
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
402
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
403 brokerHost = 'mosquitto-frontdoor.default.svc.cluster.local'
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
404 brokerPort = 10210
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
405
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
406 debugPageData = {
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
407 # schema in index.ts
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
408 'server': f'{brokerHost}:{brokerPort}',
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
409 'messagesSeen': 0,
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
410 'subscribed': [],
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
411 }
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
412
1644
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
413 inference = Inference()
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
414 inference.setRules(config)
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
415 expandedConfig = inference.infer(config)
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
416 expandedConfig += inference.nonRuleStatements()
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
417 log.info('expanded config:')
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
418 for stmt in sorted(expandedConfig):
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
419 log.info(f' {stmt}')
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
420
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
421 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
422 internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
423
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
424 influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST'])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
425
1644
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
426 # this needs to be part of the config reload. Maybe GraphFile patch output would be better?
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
427 srcs = []
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
428 for src in sorted(expandedConfig.subjects(RDF.type, ROOM['MqttStatementSource'])):
1644
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
429 log.info(f'setup source {src=}')
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
430 try:
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
431 srcs.append(
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
432 MqttStatementSource(src,
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
433 expandedConfig,
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
434 masterGraph,
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
435 mqtt=mqtt,
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
436 internalMqtt=internalMqtt,
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
437 debugPageData=debugPageData,
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
438 influxExport=influxExport,
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
439 inference=inference))
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
440 except EmptyTopicError:
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
441 continue
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
442 log.info(f'set up {len(srcs)} sources')
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
443
1642
78024b27f9ec serve graph/config
drewp@bigasterisk.com
parents: 1629
diff changeset
444 peg = PatchableGraph()
78024b27f9ec serve graph/config
drewp@bigasterisk.com
parents: 1629
diff changeset
445 peg.patch(Patch(addQuads=[(s,p,o,URIRef('/config')) for s,p,o in expandedConfig]))
78024b27f9ec serve graph/config
drewp@bigasterisk.com
parents: 1629
diff changeset
446
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
447 port = 10018
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
448 reactor.listenTCP(port,
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
449 cyclone.web.Application([
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
450 (r"/()", cyclone.web.StaticFileHandler, {
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
451 "path": ".",
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
452 "default_filename": "index.html"
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
453 }),
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
454 (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, {
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
455 "path": "build"
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
456 }),
1642
78024b27f9ec serve graph/config
drewp@bigasterisk.com
parents: 1629
diff changeset
457 (r"/graph/config", CycloneGraphHandler, {
78024b27f9ec serve graph/config
drewp@bigasterisk.com
parents: 1629
diff changeset
458 'masterGraph': peg,
78024b27f9ec serve graph/config
drewp@bigasterisk.com
parents: 1629
diff changeset
459 }),
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
460 (r"/graph/mqtt", CycloneGraphHandler, {
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
461 'masterGraph': masterGraph
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
462 }),
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
463 (r"/graph/mqtt/events", CycloneGraphEventsHandler, {
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
464 'masterGraph': masterGraph
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
465 }),
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
466 (r'/debugPageData', DebugPageData),
796
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
467 (r'/metrics', Metrics),
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
468 ],
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
469 mqtt=mqtt,
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
470 internalMqtt=internalMqtt,
1644
9e7f571deedf mqtt_to_rdf.py updates
drewp@bigasterisk.com
parents: 1643
diff changeset
471 expandedConfig=expandedConfig,
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
472 masterGraph=masterGraph,
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
473 debugPageData=debugPageData,
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
474 debug=arg['-v']),
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
475 interface='::')
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
476 log.info('serving on %s', port)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
477
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
478 reactor.run()