annotate service/mqtt_to_rdf/mqtt_to_rdf.py @ 1640:4bb6f593ebf3

speedups: abort some rules faster
author drewp@bigasterisk.com
date Wed, 15 Sep 2021 23:56:02 -0700
parents 1c36ad1eb8b3
children 78024b27f9ec
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
1 """
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
2 Subscribe to mqtt topics; generate RDF statements.
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
3 """
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
4 import glob
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
5 import json
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
6 import logging
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
7 import os
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
8
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
9 from mqtt_message import graphFromMessage
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
10 import os
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
11 import time
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
12 from dataclasses import dataclass
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
13 from pathlib import Path
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
14 from typing import Callable, Sequence, Set, Tuple, Union, cast
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
15
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
16 import cyclone.sse
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
17 import cyclone.web
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
18 import export_to_influxdb
797
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
19 import prometheus_client
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
20 import rx
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
21 import rx.operators
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
22 import rx.scheduler.eventloop
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
23 from docopt import docopt
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
24 from export_to_influxdb import InfluxExporter
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
25 from mqtt_client import MqttClient
797
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
26 from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph)
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
27 from prometheus_client import Counter, Gauge, Histogram, Summary
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
28 from prometheus_client.exposition import generate_latest
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
29 from prometheus_client.registry import REGISTRY
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
30 from rdfdb.rdflibpatch import graphFromQuads
797
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
31 from rdflib import RDF, XSD, Graph, Literal, Namespace, URIRef
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
32 from rdflib.graph import ConjunctiveGraph
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
33 from rdflib.term import Node
791
8f4e814eb1ab cleanup
drewp@bigasterisk.com
parents: 787
diff changeset
34 from rx.core import Observable
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
35 from rx.core.typing import Mapper
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
36 from standardservice.logsetup import log, verboseLogging
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
37 from twisted.internet import reactor, task
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
38 from inference import Inference
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
39 from button_events import button_events
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
40 from patch_cyclone_sse import patchCycloneSse
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
41
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
42 ROOM = Namespace('http://projects.bigasterisk.com/room/')
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
43 MESSAGES_SEEN = Counter('mqtt_messages_seen', '')
797
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
44 collectors = {}
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
45
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
46 patchCycloneSse()
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
47
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
48
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
49 def logGraph(debug: Callable, label: str, graph: Graph):
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
50 n3 = cast(bytes, graph.serialize(format="n3"))
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
51 debug(label + ':\n' + n3.decode('utf8'))
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
52
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
53
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
54 def appendLimit(lst, elem, n=10):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
55 del lst[:len(lst) - n + 1]
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
56 lst.append(elem)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
57
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
58
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
59 def parseDurationLiteral(lit: Literal) -> float:
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
60 if lit.endswith('s'):
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
61 return float(lit.split('s')[0])
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
62 raise NotImplementedError(f'duration literal: {lit}')
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
63
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
64
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
65 @dataclass
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
66 class StreamPipelineStep:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
67 uri: URIRef # a :MqttStatementSource
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
68 config: Graph
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
69
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
70 def makeOutputStream(self, inStream: Observable) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
71 return inStream
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
72
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
73
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
74 class Filters(StreamPipelineStep):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
75
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
76 def makeOutputStream(self, inStream: Observable) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
77 jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals'])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
78 if jsonEq:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
79 required = json.loads(jsonEq.toPython())
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
80
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
81 def eq(jsonBytes):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
82 msg = json.loads(jsonBytes.decode('utf8'))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
83 return msg == required
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
84
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
85 outStream = rx.operators.filter(eq)(inStream)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
86 else:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
87 outStream = inStream
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
88 return outStream
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
89
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
90
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
91 class Parser(StreamPipelineStep):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
92
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
93 def makeOutputStream(self, inStream: Observable) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
94 parser = self.getParser()
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
95 return parser(inStream)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
96
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
97 def getParser(self) -> Callable[[Observable], Observable]:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
98 parserType = cast(URIRef, self.config.value(self.uri, ROOM['parser']))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
99 func = self.getParserFunc(parserType)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
100 return rx.operators.map(cast(Mapper, func))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
101
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
102 def getParserFunc(self, parserType: URIRef) -> Callable[[bytes], Node]:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
103 if parserType == XSD.double:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
104 return lambda v: Literal(float(v))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
105 elif parserType == ROOM['tagIdToUri']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
106 return self.tagIdToUri
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
107 elif parserType == ROOM['onOffBrightness']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
108 return lambda v: Literal(0.0 if v == b'OFF' else 1.0)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
109 elif parserType == ROOM['jsonBrightness']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
110 return self.parseJsonBrightness
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
111 elif ROOM['ValueMap'] in self.config.objects(parserType, RDF.type):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
112 return lambda v: self.remap(parserType, v.decode('utf8'))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
113 elif parserType == ROOM['rfCode']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
114 return self.parseJsonRfCode
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
115 elif parserType == ROOM['tradfri']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
116 return self.parseTradfriMessage
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
117 else:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
118 raise NotImplementedError(parserType)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
119
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
120 def tagIdToUri(self, value: bytes) -> URIRef:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
121 justHex = value.decode('utf8').replace('-', '').lower()
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
122 int(justHex, 16) # validate
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
123 return URIRef(f'http://bigasterisk.com/rfidCard/{justHex}')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
124
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
125 def parseJsonBrightness(self, mqttValue: bytes):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
126 msg = json.loads(mqttValue.decode('utf8'))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
127 return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
128
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
129 def remap(self, parser, valueStr: str) -> Node:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
130 g = self.config
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
131 value = Literal(valueStr)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
132 for entry in g.objects(parser, ROOM['map']):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
133 if value == g.value(entry, ROOM['from']):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
134 to_ = g.value(entry, ROOM['to'])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
135 if not isinstance(to_, Node):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
136 raise TypeError(f'{to_=}')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
137 return to_
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
138 raise KeyError(value)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
139
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
140 def parseJsonRfCode(self, mqttValue: bytes):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
141 msg = json.loads(mqttValue.decode('utf8'))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
142 return Literal('%08x%08x' % (msg['code0'], msg['code1']))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
143
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
144 def parseTradfriMessage(self, mqttValue: bytes) -> Node:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
145 log.info(f'trad {mqttValue}')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
146 return Literal('todo')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
147
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
148
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
149 class Converters(StreamPipelineStep):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
150
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
151 def makeOutputStream(self, inStream: Observable) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
152 out = inStream
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
153 g = self.config
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
154 for conv in g.items(g.value(self.uri, ROOM['conversions'])):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
155 out = self.conversionStep(conv)(out)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
156 return out
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
157
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
158 def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
159 g = self.config
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
160 if conv == ROOM['celsiusToFarenheit']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
161
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
162 return rx.operators.map(cast(Mapper, self.c2f))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
163 elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
164 threshold = cast(Literal, g.value(conv, ROOM['ignoreValueBelow'])).toPython()
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
165 return rx.operators.filter(lambda value: cast(Literal, value).toPython() >= threshold)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
166 elif conv == ROOM['buttonPress']:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
167 loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
168 return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
169 else:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
170 raise NotImplementedError(conv)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
171
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
172 def c2f(self, value: Literal) -> Node:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
173 return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2))
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
174
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
175
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
176 class Rdfizer(StreamPipelineStep):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
177
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
178 def makeOutputStream(self, inStream: Observable) -> Observable:
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
179 plans = list(self.config.objects(self.uri, ROOM['graphStatements']))
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
180 log.debug(f'{self.uri=} has {len(plans)=}')
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
181 if not plans:
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
182 return rx.empty()
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
183 outputQuadsSets = rx.combine_latest(*[self.makeQuads(inStream, plan) for plan in plans])
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
184 return outputQuadsSets
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
185
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
186 def makeQuads(self, inStream: Observable, plan: URIRef) -> Observable:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
187
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
188 def quadsFromValue(valueNode):
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
189 return set([(self.uri, self.config.value(plan, ROOM['outputPredicate']), valueNode, self.uri)])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
190
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
191 def emptyQuads(element) -> Set[Tuple]:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
192 return set([])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
193
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
194 quads = rx.operators.map(cast(Mapper, quadsFromValue))(inStream)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
195
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
196 dur = self.config.value(plan, ROOM['statementLifetime'])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
197 if dur is not None:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
198 sec = parseDurationLiteral(dur)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
199 loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
200 quads = quads.pipe(
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
201 rx.operators.debounce(sec, loop),
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
202 rx.operators.map(cast(Mapper, emptyQuads)),
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
203 rx.operators.merge(quads),
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
204 )
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
205
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
206 return quads
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
207
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
208
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
209 def truncTime():
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
210 return round(time.time(), 3)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
211
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
212
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
213 def tightN3(node: Union[URIRef, Literal]) -> str:
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
214 return node.n3().replace('http://www.w3.org/2001/XMLSchema#', 'xsd:')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
215
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
216
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
217 def serializeWithNs(graph: Graph, hidePrefixes=False) -> str:
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
218 graph.bind('', ROOM)
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
219 n3 = cast(bytes, graph.serialize(format='n3')).decode('utf8')
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
220 if hidePrefixes:
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
221 n3 = ''.join(line for line in n3.splitlines(keepends=True) if not line.strip().startswith('@prefix'))
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
222 return n3
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
223
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
224
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
225 class MqttStatementSource:
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
226
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
227 def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData,
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
228 influxExport: InfluxExporter, inference: Inference):
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
229 self.uri = uri
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
230 self.config = config
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
231 self.masterGraph = masterGraph
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
232 self.debugPageData = debugPageData
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
233 self.mqtt = mqtt # deprecated
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
234 self.internalMqtt = internalMqtt
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
235 self.influxExport = influxExport
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
236 self.inference = inference
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
237
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
238 self.mqttTopic = self.topicFromConfig(self.config)
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
239 log.debug(f'new mqttTopic {self.mqttTopic}')
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
240
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
241 self.debugSub = {
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
242 'topic': self.mqttTopic.decode('ascii'),
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
243 'recentMessageGraphs': [],
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
244 'recentMetrics': [],
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
245 'currentOutputGraph': {
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
246 't': 1,
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
247 'n3': "(n3)"
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
248 },
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
249 }
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
250 self.debugPageData['subscribed'].append(self.debugSub)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
251
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
252 rawBytes: Observable = self.subscribeMqtt(self.mqttTopic)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
253 rawBytes.subscribe(on_next=self.countIncomingMessage)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
254
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
255 rawBytes.subscribe_(self.onMessage)
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
256
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
257 def onMessage(self, raw: bytes):
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
258 g = graphFromMessage(self.mqttTopic, raw)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
259 logGraph(log.debug, 'message graph', g)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
260 appendLimit(
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
261 self.debugSub['recentMessageGraphs'],
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
262 { #
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
263 't': truncTime(),
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
264 'n3': serializeWithNs(g, hidePrefixes=True)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
265 })
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
266
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
267 implied = self.inference.infer(g)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
268 self.updateMasterGraph(implied)
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
269
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
270 def topicFromConfig(self, config) -> bytes:
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
271 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic'])))
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
272 return b'/'.join(t.encode('ascii') for t in topicParts)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
273
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
274 def subscribeMqtt(self, topic: bytes):
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
275 # goal is to get everyone on the internal broker and eliminate this
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
276 mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
277 return mqtt.subscribe(topic)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
278
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
279 def countIncomingMessage(self, msg: bytes):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
280 self.debugPageData['messagesSeen'] += 1
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
281 MESSAGES_SEEN.inc()
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
282
1583
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
283 def updateInflux(self, newGraphs):
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
284 for g in newGraphs:
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
285 self.influxExport.exportToInflux(g)
b0608eb6e90c dead code, sort reqs
drewp@bigasterisk.com
parents: 1577
diff changeset
286
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
287 def updateMasterGraph(self, newGraph):
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
288 log.debug(f'{self.uri} update to {len(newGraph)} statements')
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
289
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
290 cg = ConjunctiveGraph()
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
291 for stmt in newGraph:
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
292 cg.add(stmt + (self.uri,))
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
293 meas = stmt[0].split('/')[-1]
796
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
294 if meas.startswith('airQuality'):
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
295 where_prefix, type_ = meas[len('airQuality'):].split('door')
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
296 where = where_prefix + 'door'
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
297 metric = 'air'
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
298 tags = {'loc': where.lower(), 'type': type_.lower()}
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
299 val = stmt[2].toPython()
796
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
300 if metric not in collectors:
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
301 collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys())
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
302
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
303 collectors[metric].labels(**tags).set(val)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
304
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
305 self.masterGraph.patchSubgraph(self.uri, cg)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
306 self.debugSub['currentOutputGraph']['n3'] = serializeWithNs(cg, hidePrefixes=True)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
307
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
308
796
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
309 class Metrics(cyclone.web.RequestHandler):
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
310
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
311 def get(self):
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
312 self.add_header('content-type', 'text/plain')
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
313 self.write(generate_latest(REGISTRY))
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
314
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
315
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
316 class DebugPageData(cyclone.sse.SSEHandler):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
317
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
318 def __init__(self, application, request):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
319 cyclone.sse.SSEHandler.__init__(self, application, request)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
320 self.lastSent = None
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
321
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
322 def watch(self):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
323 try:
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
324 dpd = self.settings.debugPageData
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
325 js = json.dumps(dpd, sort_keys=True)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
326 if js != self.lastSent:
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
327 log.debug('sending dpd update')
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
328 self.sendEvent(message=js.encode('utf8'))
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
329 self.lastSent = js
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
330 except Exception:
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
331 import traceback
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
332 traceback.print_exc()
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
333
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
334 def bind(self):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
335 self.loop = task.LoopingCall(self.watch)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
336 self.loop.start(1, now=True)
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
337
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
338 def unbind(self):
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
339 self.loop.stop()
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
340
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
341
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
342 if __name__ == '__main__':
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
343 arg = docopt("""
733
9ca69f2be87b more mqtt_to_rdf renames. bring in basic LitElement setup for the debug page
drewp@bigasterisk.com
parents: 732
diff changeset
344 Usage: mqtt_to_rdf.py [options]
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
345
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
346 -v Verbose
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
347 --cs=STR Only process config filenames with this substring
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
348 """)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
349 verboseLogging(arg['-v'])
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
350 logging.getLogger('mqtt').setLevel(logging.INFO)
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
351 logging.getLogger('mqtt_client').setLevel(logging.INFO)
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
352 logging.getLogger('infer').setLevel(logging.INFO)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
353 log.info('log start')
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
354
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
355 config = Graph()
798
cdc76c84e3e2 move conf into subdir
drewp@bigasterisk.com
parents: 797
diff changeset
356 for fn in Path('.').glob('conf/*.n3'):
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
357 if not arg['--cs'] or str(arg['--cs']) in str(fn):
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
358 log.debug(f'loading {fn}')
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
359 config.parse(str(fn), format='n3')
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
360 else:
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
361 log.debug(f'skipping {fn}')
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
362
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
363 masterGraph = PatchableGraph()
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
364
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
365 brokerHost = 'mosquitto-frontdoor.default.svc.cluster.local'
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
366 brokerPort = 10210
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
367
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
368 debugPageData = {
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
369 # schema in index.ts
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
370 'server': f'{brokerHost}:{brokerPort}',
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
371 'messagesSeen': 0,
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
372 'subscribed': [],
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
373 }
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
374
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
375 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
376 internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
377
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
378 influxExport = InfluxExporter(config, influxHost=os.environ['INFLUXDB_SERVICE_HOST'])
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
379
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
380 inference = Inference()
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
381 inference.setRules(config)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
382 expandedConfig = inference.infer(config)
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
383 log.info('expanded config:')
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
384 for stmt in sorted(expandedConfig):
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
385 log.info(f' {stmt}')
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
386 srcs = []
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
387 for src in sorted(expandedConfig.subjects(RDF.type, ROOM['MqttStatementSource'])):
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
388 srcs.append(
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
389 MqttStatementSource(src,
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
390 config,
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
391 masterGraph,
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
392 mqtt=mqtt,
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
393 internalMqtt=internalMqtt,
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
394 debugPageData=debugPageData,
1629
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
395 influxExport=influxExport,
1c36ad1eb8b3 do inference on config. backend for new ui columns. rm some of the old filter pipeline
drewp@bigasterisk.com
parents: 1583
diff changeset
396 inference=inference))
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
397 log.info(f'set up {len(srcs)} sources')
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
398
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
399 port = 10018
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
400 reactor.listenTCP(port,
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
401 cyclone.web.Application([
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
402 (r"/()", cyclone.web.StaticFileHandler, {
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
403 "path": ".",
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
404 "default_filename": "index.html"
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
405 }),
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
406 (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, {
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
407 "path": "build"
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
408 }),
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
409 (r"/graph/mqtt", CycloneGraphHandler, {
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
410 'masterGraph': masterGraph
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
411 }),
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
412 (r"/graph/mqtt/events", CycloneGraphEventsHandler, {
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
413 'masterGraph': masterGraph
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
414 }),
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
415 (r'/debugPageData', DebugPageData),
796
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
416 (r'/metrics', Metrics),
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
417 ],
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
418 mqtt=mqtt,
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
419 internalMqtt=internalMqtt,
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
420 masterGraph=masterGraph,
799
e0e623c01a69 ts build is part of docker now; new web debug console
drewp@bigasterisk.com
parents: 798
diff changeset
421 debugPageData=debugPageData,
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
422 debug=arg['-v']),
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
423 interface='::')
1577
6ddc5e037f15 big fixes and rewrites. emitting rdf works, not influx export yet
drewp@bigasterisk.com
parents: 799
diff changeset
424 log.info('serving on %s', port)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
425
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
426 reactor.run()