annotate service/mqtt_to_rdf/mqtt_to_rdf.py @ 1583:b0608eb6e90c

dead code, sort reqs
author drewp@bigasterisk.com
date Sun, 29 Aug 2021 13:43:14 -0700
parents 6ddc5e037f15
children 1c36ad1eb8b3
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
1 """
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
2 Subscribe to mqtt topics; generate RDF statements.
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
3 """
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
4 import os
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
5 import time
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
6 import json
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
7 import logging
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
8 from pathlib import Path
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
9 from typing import Callable, Sequence, Set, Tuple, Union, cast
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
10 from cyclone.util import ObjectDict
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
11 from rdflib.graph import ConjunctiveGraph
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
12
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
13 from rx.core.typing import Mapper
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
14
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
15 from export_to_influxdb import InfluxExporter
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
16
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
17 import cyclone.web
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
18 import cyclone.sse
797
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
19 import prometheus_client
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
20 import rx
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
21 import rx.operators
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
22 import rx.scheduler.eventloop
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
23 from docopt import docopt
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
24 from mqtt_client import MqttClient
797
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
25 from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph)
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
26 from prometheus_client import Counter, Gauge, Histogram, Summary
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
27 from prometheus_client.exposition import generate_latest
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
28 from prometheus_client.registry import REGISTRY
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
29 from rdfdb.rdflibpatch import graphFromQuads
797
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
30 from rdflib import RDF, XSD, Graph, Literal, Namespace, URIRef
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
31 from rdflib.term import Node
791
8f4e814eb1ab cleanup
drewp@bigasterisk.com
parents: 787
diff changeset
32 from rx.core import Observable
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
33 from standardservice.logsetup import log, verboseLogging
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
34 from twisted.internet import reactor, task
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
35 from dataclasses import dataclass
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
36 from button_events import button_events
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
37 from patch_cyclone_sse import patchCycloneSse
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
38
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
39 ROOM = Namespace('http://projects.bigasterisk.com/room/')
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
40 MESSAGES_SEEN = Counter('mqtt_messages_seen', '')
797
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
41 collectors = {}
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
42
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
43 import export_to_influxdb
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
44
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
45 print(f'merge me back {export_to_influxdb}')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
46
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
47 patchCycloneSse()
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
48
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
49
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
50 def appendLimit(lst, elem, n=10):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
51 del lst[:len(lst) - n + 1]
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
52 lst.append(elem)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
53
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
54
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
55 def parseDurationLiteral(lit: Literal) -> float:
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
56 if lit.endswith('s'):
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
57 return float(lit.split('s')[0])
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
58 raise NotImplementedError(f'duration literal: {lit}')
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
59
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
60
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
61 @dataclass
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
62 class StreamPipelineStep:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
63 uri: URIRef # a :MqttStatementSource
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
64 config: Graph
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
65
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
66 def makeOutputStream(self, inStream: Observable) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
67 return inStream
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
68
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
69
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
70 class Filters(StreamPipelineStep):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
71
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
72 def makeOutputStream(self, inStream: Observable) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
73 jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals'])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
74 if jsonEq:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
75 required = json.loads(jsonEq.toPython())
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
76
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
77 def eq(jsonBytes):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
78 msg = json.loads(jsonBytes.decode('utf8'))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
79 return msg == required
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
80
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
81 outStream = rx.operators.filter(eq)(inStream)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
82 else:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
83 outStream = inStream
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
84 return outStream
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
85
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
86
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
87 class Parser(StreamPipelineStep):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
88
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
89 def makeOutputStream(self, inStream: Observable) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
90 parser = self.getParser()
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
91 return parser(inStream)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
92
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
93 def getParser(self) -> Callable[[Observable], Observable]:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
94 parserType = cast(URIRef, self.config.value(self.uri, ROOM['parser']))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
95 func = self.getParserFunc(parserType)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
96 return rx.operators.map(cast(Mapper, func))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
97
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
98 def getParserFunc(self, parserType: URIRef) -> Callable[[bytes], Node]:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
99 if parserType == XSD.double:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
100 return lambda v: Literal(float(v))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
101 elif parserType == ROOM['tagIdToUri']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
102 return self.tagIdToUri
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
103 elif parserType == ROOM['onOffBrightness']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
104 return lambda v: Literal(0.0 if v == b'OFF' else 1.0)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
105 elif parserType == ROOM['jsonBrightness']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
106 return self.parseJsonBrightness
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
107 elif ROOM['ValueMap'] in self.config.objects(parserType, RDF.type):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
108 return lambda v: self.remap(parserType, v.decode('utf8'))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
109 elif parserType == ROOM['rfCode']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
110 return self.parseJsonRfCode
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
111 elif parserType == ROOM['tradfri']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
112 return self.parseTradfriMessage
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
113 else:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
114 raise NotImplementedError(parserType)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
115
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
116 def tagIdToUri(self, value: bytes) -> URIRef:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
117 justHex = value.decode('utf8').replace('-', '').lower()
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
118 int(justHex, 16) # validate
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
119 return URIRef(f'http://bigasterisk.com/rfidCard/{justHex}')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
120
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
121 def parseJsonBrightness(self, mqttValue: bytes):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
122 msg = json.loads(mqttValue.decode('utf8'))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
123 return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
124
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
125 def remap(self, parser, valueStr: str) -> Node:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
126 g = self.config
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
127 value = Literal(valueStr)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
128 for entry in g.objects(parser, ROOM['map']):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
129 if value == g.value(entry, ROOM['from']):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
130 to_ = g.value(entry, ROOM['to'])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
131 if not isinstance(to_, Node):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
132 raise TypeError(f'{to_=}')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
133 return to_
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
134 raise KeyError(value)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
135
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
136 def parseJsonRfCode(self, mqttValue: bytes):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
137 msg = json.loads(mqttValue.decode('utf8'))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
138 return Literal('%08x%08x' % (msg['code0'], msg['code1']))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
139
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
140 def parseTradfriMessage(self, mqttValue: bytes) -> Node:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
141 log.info(f'trad {mqttValue}')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
142 return Literal('todo')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
143
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
144
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
145 class Converters(StreamPipelineStep):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
146
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
147 def makeOutputStream(self, inStream: Observable) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
148 out = inStream
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
149 g = self.config
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
150 for conv in g.items(g.value(self.uri, ROOM['conversions'])):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
151 out = self.conversionStep(conv)(out)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
152 return out
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
153
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
154 def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
155 g = self.config
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
156 if conv == ROOM['celsiusToFarenheit']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
157
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
158 return rx.operators.map(cast(Mapper, self.c2f))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
159 elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
160 threshold = cast(Literal, g.value(conv, ROOM['ignoreValueBelow'])).toPython()
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
161 return rx.operators.filter(lambda value: cast(Literal, value).toPython() >= threshold)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
162 elif conv == ROOM['buttonPress']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
163 loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
164 return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
165 else:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
166 raise NotImplementedError(conv)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
167
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
168 def c2f(self, value: Literal) -> Node:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
169 return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
170
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
171
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
172 class Rdfizer(StreamPipelineStep):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
173
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
174 def makeOutputStream(self, inStream: Observable) -> Observable:
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
175 plans = list(self.config.objects(self.uri, ROOM['graphStatements']))
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
176 log.debug(f'{self.uri=} has {len(plans)=}')
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
177 if not plans:
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
178 return rx.empty()
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
179 outputQuadsSets = rx.combine_latest(*[self.makeQuads(inStream, plan) for plan in plans])
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
180 return outputQuadsSets
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
181
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
182 def makeQuads(self, inStream: Observable, plan: URIRef) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
183
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
184 def quadsFromValue(valueNode):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
185 return set([(self.uri, self.config.value(plan, ROOM['outputPredicate']), valueNode, self.uri)])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
186
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
187 def emptyQuads(element) -> Set[Tuple]:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
188 return set([])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
189
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
190 quads = rx.operators.map(cast(Mapper, quadsFromValue))(inStream)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
191
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
192 dur = self.config.value(plan, ROOM['statementLifetime'])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
193 if dur is not None:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
194 sec = parseDurationLiteral(dur)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
195 loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
196 quads = quads.pipe(
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
197 rx.operators.debounce(sec, loop),
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
198 rx.operators.map(cast(Mapper, emptyQuads)),
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
199 rx.operators.merge(quads),
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
200 )
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
201
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
202 return quads
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
203
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
204
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
205 def truncTime():
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
206 return round(time.time(), 3)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
207
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
208
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
209 def tightN3(node: Union[URIRef, Literal]) -> str:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
210 return node.n3().replace('http://www.w3.org/2001/XMLSchema#', 'xsd:')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
211
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
212
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
213 def serializeWithNs(graph: ConjunctiveGraph) -> bytes:
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
214 graph.bind('', ROOM)
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
215 return cast(bytes, graph.serialize(format='n3'))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
216
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
217
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
218 class MqttStatementSource:
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
219
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
220 def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData,
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
221 influxExport: InfluxExporter):
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
222 self.uri = uri
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
223 self.config = config
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
224 self.masterGraph = masterGraph
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
225 self.debugPageData = debugPageData
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
226 self.mqtt = mqtt # deprecated
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
227 self.internalMqtt = internalMqtt
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
228 self.influxExport = influxExport
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
229
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
230 self.mqttTopic = self.topicFromConfig(self.config)
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
231 log.debug(f'new mqttTopic {self.mqttTopic}')
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
232
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
233 self.debugSub = {
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
234 'topic': self.mqttTopic.decode('ascii'),
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
235 'recentMessages': [],
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
236 'recentParsed': [],
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
237 'recentConversions': [],
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
238 'currentMetrics': [],
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
239 'currentOutputGraph': {
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
240 't': 1,
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
241 'n3': "(n3)"
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
242 },
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
243 }
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
244 self.debugPageData['subscribed'].append(self.debugSub)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
245
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
246 rawBytes: Observable = self.subscribeMqtt(self.mqttTopic)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
247 rawBytes.subscribe(on_next=self.countIncomingMessage)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
248
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
249 filteredBytes = Filters(uri, config).makeOutputStream(rawBytes)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
250
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
251 parsedObjs = Parser(uri, config).makeOutputStream(filteredBytes)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
252 parsedObjs.subscribe_(lambda v: appendLimit(self.debugSub['recentParsed'], {'t': truncTime(), 'n3': tightN3(v)}))
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
253
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
254 convertedObjs = Converters(uri, config).makeOutputStream(parsedObjs)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
255 convertedObjs.subscribe_(lambda v: appendLimit(self.debugSub['recentConversions'], {'t': truncTime(), 'n3': tightN3(v)}))
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
256
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
257 outputQuadsSets = Rdfizer(uri, config).makeOutputStream(convertedObjs)
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
258 outputQuadsSets.subscribe_(self.updateInflux)
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
259
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
260 outputQuadsSets.subscribe_(self.updateMasterGraph)
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
261
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
262 def topicFromConfig(self, config) -> bytes:
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
263 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic'])))
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
264 return b'/'.join(t.encode('ascii') for t in topicParts)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
265
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
266 def subscribeMqtt(self, topic: bytes):
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
267 # goal is to get everyone on the internal broker and eliminate this
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
268 mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
269 return mqtt.subscribe(topic)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
270
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
271 def countIncomingMessage(self, msg: bytes):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
272 self.debugPageData['messagesSeen'] += 1
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
273 MESSAGES_SEEN.inc()
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
274
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
275 appendLimit(self.debugSub['recentMessages'], {
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
276 't': truncTime(),
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
277 'msg': msg.decode('ascii'),
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
278 })
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
279
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
280 def updateInflux(self, newGraphs):
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
281 for g in newGraphs:
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
282 self.influxExport.exportToInflux(g)
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
283
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
284 def updateMasterGraph(self, newGraphs):
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
285 newQuads = set.union(*newGraphs)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
286 g = graphFromQuads(newQuads)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
287 log.debug(f'{self.uri} update to {len(newQuads)} statements')
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
288
796
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
289 for quad in newQuads:
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
290 meas = quad[0].split('/')[-1]
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
291 if meas.startswith('airQuality'):
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
292 where_prefix, type_ = meas[len('airQuality'):].split('door')
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
293 where = where_prefix + 'door'
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
294 metric = 'air'
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
295 tags = {'loc': where.lower(), 'type': type_.lower()}
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
296 val = quad[2].toPython()
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
297 if metric not in collectors:
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
298 collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys())
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
299
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
300 collectors[metric].labels(**tags).set(val)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
301
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
302 self.masterGraph.patchSubgraph(self.uri, g)
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
303 self.debugSub['currentOutputGraph']['n3'] = serializeWithNs(g).decode('utf8')
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
304
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
305
796
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
306 class Metrics(cyclone.web.RequestHandler):
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
307
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
308 def get(self):
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
309 self.add_header('content-type', 'text/plain')
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
310 self.write(generate_latest(REGISTRY))
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
311
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
312
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
313 class DebugPageData(cyclone.sse.SSEHandler):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
314
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
315 def __init__(self, application, request):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
316 cyclone.sse.SSEHandler.__init__(self, application, request)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
317 self.lastSent = None
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
318
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
319 def watch(self):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
320 try:
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
321 dpd = self.settings.debugPageData
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
322 js = json.dumps(dpd, sort_keys=True)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
323 if js != self.lastSent:
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
324 log.debug('sending dpd update')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
325 self.sendEvent(message=js.encode('utf8'))
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
326 self.lastSent = js
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
327 except Exception:
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
328 import traceback
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
329 traceback.print_exc()
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
330
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
331 def bind(self):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
332 self.loop = task.LoopingCall(self.watch)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
333 self.loop.start(1, now=True)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
334
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
335 def unbind(self):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
336 self.loop.stop()
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
337
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
338
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
339 if __name__ == '__main__':
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
340 arg = docopt("""
733
9ca69f2be87b more mqtt_to_rdf renames. bring in basic LitElement setup for the debug page
drewp@bigasterisk.com
parents: 732
diff changeset
341 Usage: mqtt_to_rdf.py [options]
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
342
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
343 -v Verbose
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
344 --cs=STR Only process config filenames with this substring
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
345 """)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
346 verboseLogging(arg['-v'])
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
347 logging.getLogger('mqtt').setLevel(logging.INFO)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
348 logging.getLogger('mqtt_client').setLevel(logging.INFO)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
349
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
350 config = Graph()
798
cdc76c84e3e2 move conf into subdir
drewp@bigasterisk.com
parents: 797
diff changeset
351 for fn in Path('.').glob('conf/*.n3'):
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
352 if not arg['--cs'] or str(arg['--cs']) in str(fn):
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
353 log.debug(f'loading {fn}')
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
354 config.parse(str(fn), format='n3')
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
355 else:
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
356 log.debug(f'skipping {fn}')
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
357
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
358 masterGraph = PatchableGraph()
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
359
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
360 brokerHost = 'mosquitto-frontdoor.default.svc.cluster.local'
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
361 brokerPort = 10210
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
362
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
363 debugPageData = {
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
364 # schema in index.ts
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
365 'server': f'{brokerHost}:{brokerPort}',
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
366 'messagesSeen': 0,
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
367 'subscribed': [],
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
368 }
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
369
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
370 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
371 internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
372
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
373 influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST'])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
374
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
375 srcs = []
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
376 for src in sorted(config.subjects(RDF.type, ROOM['MqttStatementSource'])):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
377 srcs.append(
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
378 MqttStatementSource(src,
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
379 config,
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
380 masterGraph,
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
381 mqtt=mqtt,
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
382 internalMqtt=internalMqtt,
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
383 debugPageData=debugPageData,
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
384 influxExport=influxExport))
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
385 log.info(f'set up {len(srcs)} sources')
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
386
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
387 port = 10018
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
388 reactor.listenTCP(port,
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
389 cyclone.web.Application([
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
390 (r"/()", cyclone.web.StaticFileHandler, {
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
391 "path": ".",
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
392 "default_filename": "index.html"
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
393 }),
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
394 (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, {
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
395 "path": "build"
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
396 }),
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
397 (r"/graph/mqtt", CycloneGraphHandler, {
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
398 'masterGraph': masterGraph
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
399 }),
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
400 (r"/graph/mqtt/events", CycloneGraphEventsHandler, {
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
401 'masterGraph': masterGraph
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
402 }),
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
403 (r'/debugPageData', DebugPageData),
796
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
404 (r'/metrics', Metrics),
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
405 ],
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
406 mqtt=mqtt,
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
407 internalMqtt=internalMqtt,
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
408 masterGraph=masterGraph,
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
409 debugPageData=debugPageData,
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
410 debug=arg['-v']),
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
411 interface='::')
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
412 log.info('serving on %s', port)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
413
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
414 reactor.run()