Mercurial > code > home > repos > homeauto
annotate service/mqtt_to_rdf/mqtt_to_rdf.py @ 1577:6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
author | drewp@bigasterisk.com |
---|---|
date | Thu, 26 Aug 2021 16:33:05 -0700 |
parents | e0e623c01a69 |
children | b0608eb6e90c |
rev | line source |
---|---|
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
1 """ |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
2 Subscribe to mqtt topics; generate RDF statements. |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
3 """ |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
4 import os |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
5 import time |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
6 import json |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
7 import logging |
767 | 8 from pathlib import Path |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
9 from typing import Callable, Sequence, Set, Tuple, Union, cast |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
10 from cyclone.util import ObjectDict |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
11 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
12 from rx.core.typing import Mapper |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
13 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
14 from export_to_influxdb import InfluxExporter |
780 | 15 |
16 import cyclone.web | |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
17 import cyclone.sse |
797 | 18 import prometheus_client |
19 import rx | |
20 import rx.operators | |
21 import rx.scheduler.eventloop | |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
22 from docopt import docopt |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
23 from mqtt_client import MqttClient |
797 | 24 from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph) |
25 from prometheus_client import Counter, Gauge, Histogram, Summary | |
26 from prometheus_client.exposition import generate_latest | |
27 from prometheus_client.registry import REGISTRY | |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
28 from rdfdb.rdflibpatch import graphFromQuads |
797 | 29 from rdflib import RDF, XSD, Graph, Literal, Namespace, URIRef |
780 | 30 from rdflib.term import Node |
791 | 31 from rx.core import Observable |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
32 from standardservice.logsetup import log, verboseLogging |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
33 from twisted.internet import reactor, task |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
34 from dataclasses import dataclass |
793
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
35 from button_events import button_events |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
36 |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
37 ROOM = Namespace('http://projects.bigasterisk.com/room/') |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
38 |
797 | 39 collectors = {} |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
40 |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
41 import export_to_influxdb |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
42 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
43 print(f'merge me back {export_to_influxdb}') |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
44 |
780 | 45 |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
46 def appendLimit(lst, elem, n=10): |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
47 del lst[:len(lst) - n + 1] |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
48 lst.append(elem) |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
49 |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
50 |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
51 def parseDurationLiteral(lit: Literal) -> float: |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
52 if lit.endswith('s'): |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
53 return float(lit.split('s')[0]) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
54 raise NotImplementedError(f'duration literal: {lit}') |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
55 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
56 |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
57 @dataclass |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
58 class StreamPipelineStep: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
59 uri: URIRef # a :MqttStatementSource |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
60 config: Graph |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
61 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
62 def makeOutputStream(self, inStream: Observable) -> Observable: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
63 return inStream |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
64 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
65 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
66 class Filters(StreamPipelineStep): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
67 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
68 def makeOutputStream(self, inStream: Observable) -> Observable: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
69 jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals']) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
70 if jsonEq: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
71 required = json.loads(jsonEq.toPython()) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
72 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
73 def eq(jsonBytes): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
74 msg = json.loads(jsonBytes.decode('utf8')) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
75 return msg == required |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
76 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
77 outStream = rx.operators.filter(eq)(inStream) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
78 else: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
79 outStream = inStream |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
80 return outStream |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
81 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
82 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
83 class Parser(StreamPipelineStep): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
84 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
85 def makeOutputStream(self, inStream: Observable) -> Observable: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
86 parser = self.getParser() |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
87 return parser(inStream) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
88 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
89 def getParser(self) -> Callable[[Observable], Observable]: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
90 parserType = cast(URIRef, self.config.value(self.uri, ROOM['parser'])) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
91 func = self.getParserFunc(parserType) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
92 return rx.operators.map(cast(Mapper, func)) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
93 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
94 def getParserFunc(self, parserType: URIRef) -> Callable[[bytes], Node]: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
95 if parserType == XSD.double: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
96 return lambda v: Literal(float(v)) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
97 elif parserType == ROOM['tagIdToUri']: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
98 return self.tagIdToUri |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
99 elif parserType == ROOM['onOffBrightness']: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
100 return lambda v: Literal(0.0 if v == b'OFF' else 1.0) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
101 elif parserType == ROOM['jsonBrightness']: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
102 return self.parseJsonBrightness |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
103 elif ROOM['ValueMap'] in self.config.objects(parserType, RDF.type): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
104 return lambda v: self.remap(parserType, v.decode('utf8')) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
105 elif parserType == ROOM['rfCode']: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
106 return self.parseJsonRfCode |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
107 elif parserType == ROOM['tradfri']: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
108 return self.parseTradfriMessage |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
109 else: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
110 raise NotImplementedError(parserType) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
111 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
112 def tagIdToUri(self, value: bytes) -> URIRef: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
113 justHex = value.decode('utf8').replace('-', '').lower() |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
114 int(justHex, 16) # validate |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
115 return URIRef(f'http://bigasterisk.com/rfidCard/{justHex}') |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
116 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
117 def parseJsonBrightness(self, mqttValue: bytes): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
118 msg = json.loads(mqttValue.decode('utf8')) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
119 return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
120 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
121 def remap(self, parser, valueStr: str) -> Node: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
122 g = self.config |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
123 value = Literal(valueStr) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
124 for entry in g.objects(parser, ROOM['map']): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
125 if value == g.value(entry, ROOM['from']): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
126 to_ = g.value(entry, ROOM['to']) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
127 if not isinstance(to_, Node): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
128 raise TypeError(f'{to_=}') |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
129 return to_ |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
130 raise KeyError(value) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
131 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
132 def parseJsonRfCode(self, mqttValue: bytes): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
133 msg = json.loads(mqttValue.decode('utf8')) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
134 return Literal('%08x%08x' % (msg['code0'], msg['code1'])) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
135 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
136 def parseTradfriMessage(self, mqttValue: bytes) -> Node: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
137 log.info(f'trad {mqttValue}') |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
138 return Literal('todo') |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
139 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
140 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
141 class Converters(StreamPipelineStep): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
142 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
143 def makeOutputStream(self, inStream: Observable) -> Observable: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
144 out = inStream |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
145 g = self.config |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
146 for conv in g.items(g.value(self.uri, ROOM['conversions'])): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
147 out = self.conversionStep(conv)(out) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
148 return out |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
149 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
150 def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
151 g = self.config |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
152 if conv == ROOM['celsiusToFarenheit']: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
153 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
154 return rx.operators.map(cast(Mapper, self.c2f)) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
155 elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
156 threshold = cast(Literal, g.value(conv, ROOM['ignoreValueBelow'])).toPython() |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
157 return rx.operators.filter(lambda value: cast(Literal, value).toPython() >= threshold) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
158 elif conv == ROOM['buttonPress']: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
159 loop = rx.scheduler.eventloop.TwistedScheduler(reactor) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
160 return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
161 else: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
162 raise NotImplementedError(conv) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
163 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
164 def c2f(self, value: Literal) -> Node: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
165 return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2)) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
166 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
167 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
168 class Rdfizer(StreamPipelineStep): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
169 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
170 def makeOutputStream(self, inStream: Observable) -> Observable: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
171 outputQuadsSets = rx.combine_latest( |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
172 *[self.makeQuads(inStream, plan) for plan in self.config.objects(self.uri, ROOM['graphStatements'])]) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
173 return outputQuadsSets |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
174 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
175 def makeQuads(self, inStream: Observable, plan: URIRef) -> Observable: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
176 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
177 def quadsFromValue(valueNode): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
178 return set([(self.uri, self.config.value(plan, ROOM['outputPredicate']), valueNode, self.uri)]) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
179 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
180 def emptyQuads(element) -> Set[Tuple]: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
181 return set([]) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
182 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
183 quads = rx.operators.map(cast(Mapper, quadsFromValue))(inStream) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
184 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
185 dur = self.config.value(plan, ROOM['statementLifetime']) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
186 if dur is not None: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
187 sec = parseDurationLiteral(dur) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
188 loop = rx.scheduler.eventloop.TwistedScheduler(reactor) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
189 quads = quads.pipe( |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
190 rx.operators.debounce(sec, loop), |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
191 rx.operators.map(cast(Mapper, emptyQuads)), |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
192 rx.operators.merge(quads), |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
193 ) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
194 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
195 return quads |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
196 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
197 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
198 def truncTime(): |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
199 return round(time.time(), 3) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
200 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
201 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
202 def tightN3(node: Union[URIRef, Literal]) -> str: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
203 return node.n3().replace('http://www.w3.org/2001/XMLSchema#', 'xsd:') |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
204 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
205 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
206 def serializeWithNs(graph: PatchableGraph) -> bytes: |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
207 graph._graph.bind('', 'http://projects.bigasterisk.com/room/') |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
208 return cast(bytes, graph.serialize(format='n3')) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
209 |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
210 |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
211 class MqttStatementSource: |
780 | 212 |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
213 def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData, |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
214 influxExport: InfluxExporter): |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
215 self.uri = uri |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
216 self.config = config |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
217 self.masterGraph = masterGraph |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
218 self.debugPageData = debugPageData |
780 | 219 self.mqtt = mqtt # deprecated |
767 | 220 self.internalMqtt = internalMqtt |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
221 self.influxExport = influxExport |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
222 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
223 self.mqttTopic = self.topicFromConfig(self.config) |
767 | 224 log.debug(f'new mqttTopic {self.mqttTopic}') |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
225 |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
226 self.debugSub = { |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
227 'topic': self.mqttTopic.decode('ascii'), |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
228 'recentMessages': [], |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
229 'recentParsed': [], |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
230 'recentConversions': [], |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
231 'currentMetrics': [], |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
232 'currentOutputGraph': { |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
233 't': 1, |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
234 'n3': "(n3)" |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
235 }, |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
236 } |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
237 self.debugPageData['subscribed'].append(self.debugSub) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
238 |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
239 rawBytes: Observable = self.subscribeMqtt(self.mqttTopic) |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
240 rawBytes.subscribe(on_next=self.countIncomingMessage) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
241 |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
242 filteredBytes = Filters(uri, config).makeOutputStream(rawBytes) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
243 |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
244 parsedObjs = Parser(uri, config).makeOutputStream(filteredBytes) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
245 parsedObjs.subscribe_(lambda v: appendLimit(self.debugSub['recentParsed'], {'t': truncTime(), 'n3': tightN3(v)})) |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
246 |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
247 convertedObjs = Converters(uri, config).makeOutputStream(parsedObjs) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
248 convertedObjs.subscribe_(lambda v: appendLimit(self.debugSub['recentConversions'], {'t': truncTime(), 'n3': tightN3(v)})) |
793
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
249 |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
250 outputQuadsSets = Rdfizer(uri, config).makeOutputStream(convertedObjs) |
793
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
251 |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
252 outputQuadsSets.subscribe_(self.updateMasterGraph) |
793
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
253 |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
254 def topicFromConfig(self, config) -> bytes: |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
255 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
256 return b'/'.join(t.encode('ascii') for t in topicParts) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
257 |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
258 def subscribeMqtt(self, topic: bytes): |
767 | 259 # goal is to get everyone on the internal broker and eliminate this |
260 mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt | |
261 return mqtt.subscribe(topic) | |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
262 |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
263 def countIncomingMessage(self, msg: bytes): |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
264 self.debugPageData['messagesSeen'] += 1 |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
265 |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
266 appendLimit(self.debugSub['recentMessages'], { |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
267 't': truncTime(), |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
268 'msg': msg.decode('ascii'), |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
269 }) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
270 |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
271 def updateMasterGraph(self, newGraphs): |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
272 newQuads = set.union(*newGraphs) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
273 g = graphFromQuads(newQuads) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
274 log.debug(f'{self.uri} update to {len(newQuads)} statements') |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
275 |
796
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
276 for quad in newQuads: |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
277 meas = quad[0].split('/')[-1] |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
278 if meas.startswith('airQuality'): |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
279 where_prefix, type_ = meas[len('airQuality'):].split('door') |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
280 where = where_prefix + 'door' |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
281 metric = 'air' |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
282 tags = {'loc': where.lower(), 'type': type_.lower()} |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
283 val = quad[2].toPython() |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
284 if metric not in collectors: |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
285 collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys()) |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
286 |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
287 collectors[metric].labels(**tags).set(val) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
288 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
289 self.masterGraph.patchSubgraph(self.uri, g) |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
290 self.debugSub['currentOutputGraph']['n3'] = cast(bytes, self.masterGraph.serialize(format='n3')).decode('utf8') |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
291 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
292 |
796
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
293 class Metrics(cyclone.web.RequestHandler): |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
294 |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
295 def get(self): |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
296 self.add_header('content-type', 'text/plain') |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
297 self.write(generate_latest(REGISTRY)) |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
298 |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
299 |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
300 class DebugPageData(cyclone.sse.SSEHandler): |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
301 |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
302 def __init__(self, application, request): |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
303 cyclone.sse.SSEHandler.__init__(self, application, request) |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
304 self.lastSent = None |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
305 |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
306 def watch(self): |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
307 try: |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
308 dpd = self.settings.debugPageData |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
309 js = json.dumps(dpd, sort_keys=True) |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
310 if js != self.lastSent: |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
311 log.debug('sending dpd update') |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
312 self.sendEvent(message=js.encode('utf8')) |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
313 self.lastSent = js |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
314 except Exception: |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
315 import traceback |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
316 traceback.print_exc() |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
317 |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
318 def bind(self): |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
319 self.loop = task.LoopingCall(self.watch) |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
320 self.loop.start(1, now=True) |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
321 |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
322 def unbind(self): |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
323 self.loop.stop() |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
324 |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
325 |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
326 if __name__ == '__main__': |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
327 arg = docopt(""" |
733
9ca69f2be87b
more mqtt_to_rdf renames. bring in basic LitElement setup for the debug page
drewp@bigasterisk.com
parents:
732
diff
changeset
|
328 Usage: mqtt_to_rdf.py [options] |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
329 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
330 -v Verbose |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
331 --cs=STR Only process config filenames with this substring |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
332 """) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
333 verboseLogging(arg['-v']) |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
334 logging.getLogger('mqtt').setLevel(logging.INFO) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
335 logging.getLogger('mqtt_client').setLevel(logging.INFO) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
336 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
337 config = Graph() |
798 | 338 for fn in Path('.').glob('conf/*.n3'): |
767 | 339 if not arg['--cs'] or str(arg['--cs']) in str(fn): |
340 log.debug(f'loading {fn}') | |
341 config.parse(str(fn), format='n3') | |
342 else: | |
343 log.debug(f'skipping {fn}') | |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
344 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
345 masterGraph = PatchableGraph() |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
346 |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
347 brokerHost = 'mosquitto-frontdoor.default.svc.cluster.local' |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
348 brokerPort = 10210 |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
349 |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
350 debugPageData = { |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
351 # schema in index.ts |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
352 'server': f'{brokerHost}:{brokerPort}', |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
353 'messagesSeen': 0, |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
354 'subscribed': [], |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
355 } |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
356 |
780 | 357 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
358 internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
359 |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
360 influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST']) |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
361 |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
362 srcs = [] |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
363 for src in sorted(config.subjects(RDF.type, ROOM['MqttStatementSource'])): |
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
364 srcs.append( |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
365 MqttStatementSource(src, |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
366 config, |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
367 masterGraph, |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
368 mqtt=mqtt, |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
369 internalMqtt=internalMqtt, |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
370 debugPageData=debugPageData, |
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
371 influxExport=influxExport)) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
372 log.info(f'set up {len(srcs)} sources') |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
373 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
374 port = 10018 |
780 | 375 reactor.listenTCP(port, |
376 cyclone.web.Application([ | |
377 (r"/()", cyclone.web.StaticFileHandler, { | |
378 "path": ".", | |
379 "default_filename": "index.html" | |
380 }), | |
381 (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, { | |
382 "path": "build" | |
383 }), | |
384 (r"/graph/mqtt", CycloneGraphHandler, { | |
385 'masterGraph': masterGraph | |
386 }), | |
387 (r"/graph/mqtt/events", CycloneGraphEventsHandler, { | |
388 'masterGraph': masterGraph | |
389 }), | |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
390 (r'/debugPageData', DebugPageData), |
796
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
391 (r'/metrics', Metrics), |
780 | 392 ], |
393 mqtt=mqtt, | |
394 internalMqtt=internalMqtt, | |
395 masterGraph=masterGraph, | |
799
e0e623c01a69
ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents:
798
diff
changeset
|
396 debugPageData=debugPageData, |
780 | 397 debug=arg['-v']), |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
398 interface='::') |
1577
6ddc5e037f15
big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents:
799
diff
changeset
|
399 log.info('serving on %s', port) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
400 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
401 reactor.run() |