annotate service/mqtt_to_rdf/mqtt_to_rdf.py @ 1642:78024b27f9ec

serve graph/config
author drewp@bigasterisk.com
date Fri, 17 Sep 2021 11:01:06 -0700
parents 1c36ad1eb8b3
children c04b5303eb08
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
1 """
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
2 Subscribe to mqtt topics; generate RDF statements.
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
3 """
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
4 import glob
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
5 import json
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
6 import logging
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
7 import os
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
8
1642
78024b27f9ec serve graph/config
drewp@bigasterisk.com
parents: 1629
diff changeset
9 from rdfdb.patch import Patch
78024b27f9ec serve graph/config
drewp@bigasterisk.com
parents: 1629
diff changeset
10
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
11 from mqtt_message import graphFromMessage
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
12 import os
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
13 import time
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
14 from dataclasses import dataclass
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
15 from pathlib import Path
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
16 from typing import Callable, Sequence, Set, Tuple, Union, cast
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
17
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
18 import cyclone.sse
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
19 import cyclone.web
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
20 import export_to_influxdb
797
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
21 import prometheus_client
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
22 import rx
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
23 import rx.operators
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
24 import rx.scheduler.eventloop
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
25 from docopt import docopt
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
26 from export_to_influxdb import InfluxExporter
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
27 from mqtt_client import MqttClient
797
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
28 from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph)
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
29 from prometheus_client import Counter, Gauge, Histogram, Summary
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
30 from prometheus_client.exposition import generate_latest
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
31 from prometheus_client.registry import REGISTRY
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
32 from rdfdb.rdflibpatch import graphFromQuads
797
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
33 from rdflib import RDF, XSD, Graph, Literal, Namespace, URIRef
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
34 from rdflib.graph import ConjunctiveGraph
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
35 from rdflib.term import Node
791
8f4e814eb1ab cleanup
drewp@bigasterisk.com
parents: 787
diff changeset
36 from rx.core import Observable
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
37 from rx.core.typing import Mapper
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
38 from standardservice.logsetup import log, verboseLogging
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
39 from twisted.internet import reactor, task
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
40 from inference import Inference
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
41 from button_events import button_events
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
42 from patch_cyclone_sse import patchCycloneSse
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
43
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
44 ROOM = Namespace('http://projects.bigasterisk.com/room/')
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
45 MESSAGES_SEEN = Counter('mqtt_messages_seen', '')
797
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
46 collectors = {}
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
47
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
48 patchCycloneSse()
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
49
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
50
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
51 def logGraph(debug: Callable, label: str, graph: Graph):
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
52 n3 = cast(bytes, graph.serialize(format="n3"))
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
53 debug(label + ':\n' + n3.decode('utf8'))
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
54
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
55
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
56 def appendLimit(lst, elem, n=10):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
57 del lst[:len(lst) - n + 1]
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
58 lst.append(elem)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
59
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
60
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
61 def parseDurationLiteral(lit: Literal) -> float:
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
62 if lit.endswith('s'):
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
63 return float(lit.split('s')[0])
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
64 raise NotImplementedError(f'duration literal: {lit}')
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
65
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
66
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
67 @dataclass
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
68 class StreamPipelineStep:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
69 uri: URIRef # a :MqttStatementSource
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
70 config: Graph
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
71
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
72 def makeOutputStream(self, inStream: Observable) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
73 return inStream
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
74
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
75
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
76 class Filters(StreamPipelineStep):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
77
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
78 def makeOutputStream(self, inStream: Observable) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
79 jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals'])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
80 if jsonEq:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
81 required = json.loads(jsonEq.toPython())
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
82
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
83 def eq(jsonBytes):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
84 msg = json.loads(jsonBytes.decode('utf8'))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
85 return msg == required
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
86
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
87 outStream = rx.operators.filter(eq)(inStream)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
88 else:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
89 outStream = inStream
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
90 return outStream
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
91
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
92
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
93 class Parser(StreamPipelineStep):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
94
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
95 def makeOutputStream(self, inStream: Observable) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
96 parser = self.getParser()
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
97 return parser(inStream)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
98
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
99 def getParser(self) -> Callable[[Observable], Observable]:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
100 parserType = cast(URIRef, self.config.value(self.uri, ROOM['parser']))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
101 func = self.getParserFunc(parserType)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
102 return rx.operators.map(cast(Mapper, func))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
103
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
104 def getParserFunc(self, parserType: URIRef) -> Callable[[bytes], Node]:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
105 if parserType == XSD.double:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
106 return lambda v: Literal(float(v))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
107 elif parserType == ROOM['tagIdToUri']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
108 return self.tagIdToUri
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
109 elif parserType == ROOM['onOffBrightness']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
110 return lambda v: Literal(0.0 if v == b'OFF' else 1.0)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
111 elif parserType == ROOM['jsonBrightness']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
112 return self.parseJsonBrightness
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
113 elif ROOM['ValueMap'] in self.config.objects(parserType, RDF.type):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
114 return lambda v: self.remap(parserType, v.decode('utf8'))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
115 elif parserType == ROOM['rfCode']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
116 return self.parseJsonRfCode
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
117 elif parserType == ROOM['tradfri']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
118 return self.parseTradfriMessage
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
119 else:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
120 raise NotImplementedError(parserType)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
121
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
122 def tagIdToUri(self, value: bytes) -> URIRef:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
123 justHex = value.decode('utf8').replace('-', '').lower()
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
124 int(justHex, 16) # validate
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
125 return URIRef(f'http://bigasterisk.com/rfidCard/{justHex}')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
126
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
127 def parseJsonBrightness(self, mqttValue: bytes):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
128 msg = json.loads(mqttValue.decode('utf8'))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
129 return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
130
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
131 def remap(self, parser, valueStr: str) -> Node:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
132 g = self.config
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
133 value = Literal(valueStr)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
134 for entry in g.objects(parser, ROOM['map']):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
135 if value == g.value(entry, ROOM['from']):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
136 to_ = g.value(entry, ROOM['to'])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
137 if not isinstance(to_, Node):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
138 raise TypeError(f'{to_=}')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
139 return to_
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
140 raise KeyError(value)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
141
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
142 def parseJsonRfCode(self, mqttValue: bytes):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
143 msg = json.loads(mqttValue.decode('utf8'))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
144 return Literal('%08x%08x' % (msg['code0'], msg['code1']))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
145
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
146 def parseTradfriMessage(self, mqttValue: bytes) -> Node:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
147 log.info(f'trad {mqttValue}')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
148 return Literal('todo')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
149
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
150
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
151 class Converters(StreamPipelineStep):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
152
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
153 def makeOutputStream(self, inStream: Observable) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
154 out = inStream
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
155 g = self.config
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
156 for conv in g.items(g.value(self.uri, ROOM['conversions'])):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
157 out = self.conversionStep(conv)(out)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
158 return out
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
159
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
160 def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
161 g = self.config
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
162 if conv == ROOM['celsiusToFarenheit']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
163
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
164 return rx.operators.map(cast(Mapper, self.c2f))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
165 elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
166 threshold = cast(Literal, g.value(conv, ROOM['ignoreValueBelow'])).toPython()
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
167 return rx.operators.filter(lambda value: cast(Literal, value).toPython() >= threshold)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
168 elif conv == ROOM['buttonPress']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
169 loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
170 return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
171 else:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
172 raise NotImplementedError(conv)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
173
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
174 def c2f(self, value: Literal) -> Node:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
175 return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
176
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
177
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
178 class Rdfizer(StreamPipelineStep):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
179
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
180 def makeOutputStream(self, inStream: Observable) -> Observable:
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
181 plans = list(self.config.objects(self.uri, ROOM['graphStatements']))
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
182 log.debug(f'{self.uri=} has {len(plans)=}')
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
183 if not plans:
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
184 return rx.empty()
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
185 outputQuadsSets = rx.combine_latest(*[self.makeQuads(inStream, plan) for plan in plans])
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
186 return outputQuadsSets
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
187
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
188 def makeQuads(self, inStream: Observable, plan: URIRef) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
189
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
190 def quadsFromValue(valueNode):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
191 return set([(self.uri, self.config.value(plan, ROOM['outputPredicate']), valueNode, self.uri)])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
192
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
193 def emptyQuads(element) -> Set[Tuple]:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
194 return set([])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
195
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
196 quads = rx.operators.map(cast(Mapper, quadsFromValue))(inStream)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
197
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
198 dur = self.config.value(plan, ROOM['statementLifetime'])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
199 if dur is not None:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
200 sec = parseDurationLiteral(dur)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
201 loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
202 quads = quads.pipe(
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
203 rx.operators.debounce(sec, loop),
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
204 rx.operators.map(cast(Mapper, emptyQuads)),
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
205 rx.operators.merge(quads),
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
206 )
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
207
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
208 return quads
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
209
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
210
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
211 def truncTime():
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
212 return round(time.time(), 3)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
213
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
214
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
215 def tightN3(node: Union[URIRef, Literal]) -> str:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
216 return node.n3().replace('http://www.w3.org/2001/XMLSchema#', 'xsd:')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
217
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
218
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
219 def serializeWithNs(graph: Graph, hidePrefixes=False) -> str:
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
220 graph.bind('', ROOM)
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
221 n3 = cast(bytes, graph.serialize(format='n3')).decode('utf8')
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
222 if hidePrefixes:
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
223 n3 = ''.join(line for line in n3.splitlines(keepends=True) if not line.strip().startswith('@prefix'))
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
224 return n3
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
225
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
226
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
227 class MqttStatementSource:
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
228
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
229 def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData,
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
230 influxExport: InfluxExporter, inference: Inference):
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
231 self.uri = uri
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
232 self.config = config
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
233 self.masterGraph = masterGraph
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
234 self.debugPageData = debugPageData
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
235 self.mqtt = mqtt # deprecated
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
236 self.internalMqtt = internalMqtt
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
237 self.influxExport = influxExport
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
238 self.inference = inference
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
239
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
240 self.mqttTopic = self.topicFromConfig(self.config)
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
241 log.debug(f'new mqttTopic {self.mqttTopic}')
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
242
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
243 self.debugSub = {
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
244 'topic': self.mqttTopic.decode('ascii'),
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
245 'recentMessageGraphs': [],
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
246 'recentMetrics': [],
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
247 'currentOutputGraph': {
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
248 't': 1,
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
249 'n3': "(n3)"
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
250 },
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
251 }
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
252 self.debugPageData['subscribed'].append(self.debugSub)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
253
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
254 rawBytes: Observable = self.subscribeMqtt(self.mqttTopic)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
255 rawBytes.subscribe(on_next=self.countIncomingMessage)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
256
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
257 rawBytes.subscribe_(self.onMessage)
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
258
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
259 def onMessage(self, raw: bytes):
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
260 g = graphFromMessage(self.mqttTopic, raw)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
261 logGraph(log.debug, 'message graph', g)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
262 appendLimit(
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
263 self.debugSub['recentMessageGraphs'],
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
264 { #
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
265 't': truncTime(),
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
266 'n3': serializeWithNs(g, hidePrefixes=True)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
267 })
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
268
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
269 implied = self.inference.infer(g)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
270 self.updateMasterGraph(implied)
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
271
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
272 def topicFromConfig(self, config) -> bytes:
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
273 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic'])))
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
274 return b'/'.join(t.encode('ascii') for t in topicParts)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
275
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
276 def subscribeMqtt(self, topic: bytes):
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
277 # goal is to get everyone on the internal broker and eliminate this
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
278 mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
279 return mqtt.subscribe(topic)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
280
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
281 def countIncomingMessage(self, msg: bytes):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
282 self.debugPageData['messagesSeen'] += 1
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
283 MESSAGES_SEEN.inc()
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
284
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
285 def updateInflux(self, newGraphs):
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
286 for g in newGraphs:
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
287 self.influxExport.exportToInflux(g)
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
288
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
289 def updateMasterGraph(self, newGraph):
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
290 log.debug(f'{self.uri} update to {len(newGraph)} statements')
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
291
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
292 cg = ConjunctiveGraph()
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
293 for stmt in newGraph:
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
294 cg.add(stmt + (self.uri,))
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
295 meas = stmt[0].split('/')[-1]
796
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
296 if meas.startswith('airQuality'):
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
297 where_prefix, type_ = meas[len('airQuality'):].split('door')
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
298 where = where_prefix + 'door'
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
299 metric = 'air'
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
300 tags = {'loc': where.lower(), 'type': type_.lower()}
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
301 val = stmt[2].toPython()
796
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
302 if metric not in collectors:
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
303 collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys())
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
304
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
305 collectors[metric].labels(**tags).set(val)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
306
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
307 self.masterGraph.patchSubgraph(self.uri, cg)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
308 self.debugSub['currentOutputGraph']['n3'] = serializeWithNs(cg, hidePrefixes=True)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
309
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
310
796
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
311 class Metrics(cyclone.web.RequestHandler):
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
312
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
313 def get(self):
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
314 self.add_header('content-type', 'text/plain')
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
315 self.write(generate_latest(REGISTRY))
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
316
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
317
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
318 class DebugPageData(cyclone.sse.SSEHandler):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
319
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
320 def __init__(self, application, request):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
321 cyclone.sse.SSEHandler.__init__(self, application, request)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
322 self.lastSent = None
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
323
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
324 def watch(self):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
325 try:
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
326 dpd = self.settings.debugPageData
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
327 js = json.dumps(dpd, sort_keys=True)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
328 if js != self.lastSent:
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
329 log.debug('sending dpd update')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
330 self.sendEvent(message=js.encode('utf8'))
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
331 self.lastSent = js
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
332 except Exception:
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
333 import traceback
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
334 traceback.print_exc()
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
335
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
336 def bind(self):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
337 self.loop = task.LoopingCall(self.watch)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
338 self.loop.start(1, now=True)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
339
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
340 def unbind(self):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
341 self.loop.stop()
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
342
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
343
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
344 if __name__ == '__main__':
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
345 arg = docopt("""
733
9ca69f2be87b more mqtt_to_rdf renames. bring in basic LitElement setup for the debug page
drewp@bigasterisk.com
parents: 732
diff changeset
346 Usage: mqtt_to_rdf.py [options]
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
347
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
348 -v Verbose
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
349 --cs=STR Only process config filenames with this substring
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
350 """)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
351 verboseLogging(arg['-v'])
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
352 logging.getLogger('mqtt').setLevel(logging.INFO)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
353 logging.getLogger('mqtt_client').setLevel(logging.INFO)
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
354 logging.getLogger('infer').setLevel(logging.INFO)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
355 log.info('log start')
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
356
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
357 config = Graph()
798
cdc76c84e3e2 move conf into subdir
drewp@bigasterisk.com
parents: 797
diff changeset
358 for fn in Path('.').glob('conf/*.n3'):
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
359 if not arg['--cs'] or str(arg['--cs']) in str(fn):
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
360 log.debug(f'loading {fn}')
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
361 config.parse(str(fn), format='n3')
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
362 else:
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
363 log.debug(f'skipping {fn}')
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
364
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
365 masterGraph = PatchableGraph()
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
366
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
367 brokerHost = 'mosquitto-frontdoor.default.svc.cluster.local'
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
368 brokerPort = 10210
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
369
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
370 debugPageData = {
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
371 # schema in index.ts
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
372 'server': f'{brokerHost}:{brokerPort}',
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
373 'messagesSeen': 0,
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
374 'subscribed': [],
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
375 }
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
376
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
377 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
378 internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
379
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
380 influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST'])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
381
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
382 inference = Inference()
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
383 inference.setRules(config)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
384 expandedConfig = inference.infer(config)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
385 log.info('expanded config:')
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
386 for stmt in sorted(expandedConfig):
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
387 log.info(f' {stmt}')
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
388 srcs = []
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
389 for src in sorted(expandedConfig.subjects(RDF.type, ROOM['MqttStatementSource'])):
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
390 srcs.append(
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
391 MqttStatementSource(src,
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
392 config,
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
393 masterGraph,
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
394 mqtt=mqtt,
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
395 internalMqtt=internalMqtt,
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
396 debugPageData=debugPageData,
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
397 influxExport=influxExport,
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
398 inference=inference))
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
399 log.info(f'set up {len(srcs)} sources')
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
400
1642
78024b27f9ec serve graph/config
drewp@bigasterisk.com
parents: 1629
diff changeset
401 peg = PatchableGraph()
78024b27f9ec serve graph/config
drewp@bigasterisk.com
parents: 1629
diff changeset
402 peg.patch(Patch(addQuads=[(s,p,o,URIRef('/config')) for s,p,o in expandedConfig]))
78024b27f9ec serve graph/config
drewp@bigasterisk.com
parents: 1629
diff changeset
403
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
404 port = 10018
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
405 reactor.listenTCP(port,
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
406 cyclone.web.Application([
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
407 (r"/()", cyclone.web.StaticFileHandler, {
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
408 "path": ".",
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
409 "default_filename": "index.html"
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
410 }),
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
411 (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, {
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
412 "path": "build"
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
413 }),
1642
78024b27f9ec serve graph/config
drewp@bigasterisk.com
parents: 1629
diff changeset
414 (r"/graph/config", CycloneGraphHandler, {
78024b27f9ec serve graph/config
drewp@bigasterisk.com
parents: 1629
diff changeset
415 'masterGraph': peg,
78024b27f9ec serve graph/config
drewp@bigasterisk.com
parents: 1629
diff changeset
416 }),
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
417 (r"/graph/mqtt", CycloneGraphHandler, {
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
418 'masterGraph': masterGraph
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
419 }),
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
420 (r"/graph/mqtt/events", CycloneGraphEventsHandler, {
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
421 'masterGraph': masterGraph
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
422 }),
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
423 (r'/debugPageData', DebugPageData),
796
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
424 (r'/metrics', Metrics),
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
425 ],
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
426 mqtt=mqtt,
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
427 internalMqtt=internalMqtt,
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
428 masterGraph=masterGraph,
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
429 debugPageData=debugPageData,
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
430 debug=arg['-v']),
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
431 interface='::')
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
432 log.info('serving on %s', port)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
433
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
434 reactor.run()