Mercurial > code > home > repos > homeauto
annotate service/mqtt_to_rdf/mqtt_to_rdf.py @ 797:a3e430b39177
reformat
author | drewp@bigasterisk.com |
---|---|
date | Tue, 29 Dec 2020 20:55:24 -0800 |
parents | fc74ae6d5d68 |
children | cdc76c84e3e2 |
rev | line source |
---|---|
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
1 """ |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
2 Subscribe to mqtt topics; generate RDF statements. |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
3 """ |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
4 import json |
767 | 5 from pathlib import Path |
793
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
6 from typing import Callable, cast |
780 | 7 |
8 import cyclone.web | |
797 | 9 import prometheus_client |
10 import rx | |
11 import rx.operators | |
12 import rx.scheduler.eventloop | |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
13 from docopt import docopt |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
14 from mqtt_client import MqttClient |
797 | 15 from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph) |
16 from prometheus_client import Counter, Gauge, Histogram, Summary | |
17 from prometheus_client.exposition import generate_latest | |
18 from prometheus_client.registry import REGISTRY | |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
19 from rdfdb.rdflibpatch import graphFromQuads |
797 | 20 from rdflib import RDF, XSD, Graph, Literal, Namespace, URIRef |
780 | 21 from rdflib.term import Node |
791 | 22 from rx.core import Observable |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
23 from standardservice.logsetup import log, verboseLogging |
780 | 24 from twisted.internet import reactor |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
25 |
793
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
26 from button_events import button_events |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
27 |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
28 ROOM = Namespace('http://projects.bigasterisk.com/room/') |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
29 |
797 | 30 collectors = {} |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
31 |
780 | 32 |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
33 def parseDurationLiteral(lit: Literal) -> float: |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
34 if lit.endswith('s'): |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
35 return float(lit.split('s')[0]) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
36 raise NotImplementedError(f'duration literal: {lit}') |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
37 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
38 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
39 class MqttStatementSource: |
780 | 40 |
796
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
41 def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt): |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
42 self.uri = uri |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
43 self.config = config |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
44 self.masterGraph = masterGraph |
780 | 45 self.mqtt = mqtt # deprecated |
767 | 46 self.internalMqtt = internalMqtt |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
47 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
48 self.mqttTopic = self.topicFromConfig(self.config) |
767 | 49 log.debug(f'new mqttTopic {self.mqttTopic}') |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
50 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
51 statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|') |
796
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
52 #scales.init(self, statPath) |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
53 #self._mqttStats = scales.collection(statPath + '/incoming', scales.IntStat('count'), scales.RecentFpsStat('fps')) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
54 |
767 | 55 rawBytes = self.subscribeMqtt(self.mqttTopic) |
793
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
56 rawBytes = self.addFilters(rawBytes) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
57 rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
58 parsed = self.getParser()(rawBytes) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
59 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
60 g = self.config |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
61 for conv in g.items(g.value(self.uri, ROOM['conversions'])): |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
62 parsed = self.conversionStep(conv)(parsed) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
63 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
64 outputQuadsSets = rx.combine_latest( |
780 | 65 *[self.makeQuads(parsed, plan) for plan in g.objects(self.uri, ROOM['graphStatements'])]) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
66 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
67 outputQuadsSets.subscribe_(self.updateQuads) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
68 |
793
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
69 def addFilters(self, rawBytes): |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
70 jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals']) |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
71 if jsonEq: |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
72 required = json.loads(jsonEq.toPython()) |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
73 |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
74 def eq(jsonBytes): |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
75 msg = json.loads(jsonBytes.decode('ascii')) |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
76 return msg == required |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
77 |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
78 rawBytes = rx.operators.filter(eq)(rawBytes) |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
79 return rawBytes |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
80 |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
81 def topicFromConfig(self, config) -> bytes: |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
82 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
83 return b'/'.join(t.encode('ascii') for t in topicParts) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
84 |
767 | 85 def subscribeMqtt(self, topic): |
86 # goal is to get everyone on the internal broker and eliminate this | |
87 mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt | |
88 return mqtt.subscribe(topic) | |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
89 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
90 def countIncomingMessage(self, _): |
796
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
91 pass #self._mqttStats.fps.mark() |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
92 #self._mqttStats.count += 1 |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
93 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
94 def getParser(self): |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
95 g = self.config |
791 | 96 |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
97 parser = g.value(self.uri, ROOM['parser']) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
98 if parser == XSD.double: |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
99 return rx.operators.map(lambda v: Literal(float(v.decode('ascii')))) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
100 elif parser == ROOM['tagIdToUri']: |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
101 return rx.operators.map(self.tagIdToUri) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
102 elif parser == ROOM['onOffBrightness']: |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
103 return rx.operators.map(lambda v: Literal(0.0 if v == b'OFF' else 1.0)) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
104 elif parser == ROOM['jsonBrightness']: |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
105 return rx.operators.map(self.parseJsonBrightness) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
106 elif ROOM['ValueMap'] in g.objects(parser, RDF.type): |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
107 return rx.operators.map(lambda v: self.remap(parser, v.decode('ascii'))) |
793
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
108 elif parser == ROOM['rfCode']: |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
109 return rx.operators.map(self.parseJsonRfCode) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
110 else: |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
111 raise NotImplementedError(parser) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
112 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
113 def parseJsonBrightness(self, mqttValue: bytes): |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
114 msg = json.loads(mqttValue.decode('ascii')) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
115 return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
116 |
793
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
117 def parseJsonRfCode(self, mqttValue: bytes): |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
118 msg = json.loads(mqttValue.decode('ascii')) |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
119 return Literal('%08x%08x' % (msg['code0'], msg['code1'])) |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
120 |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
121 def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]: |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
122 g = self.config |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
123 if conv == ROOM['celsiusToFarenheit']: |
791 | 124 |
125 def c2f(value: Literal) -> Node: | |
126 return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2)) | |
127 | |
128 return rx.operators.map(c2f) | |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
129 elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None: |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
130 threshold = g.value(conv, ROOM['ignoreValueBelow']) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
131 return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython()) |
793
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
132 elif conv == ROOM['buttonPress']: |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
133 loop = rx.scheduler.eventloop.TwistedScheduler(reactor) |
c3e3bd5dfa0b
add rf button mqtt message processing
drewp@bigasterisk.com
parents:
791
diff
changeset
|
134 return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
135 else: |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
136 raise NotImplementedError(conv) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
137 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
138 def makeQuads(self, parsed, plan): |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
139 g = self.config |
780 | 140 |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
141 def quadsFromValue(valueNode): |
780 | 142 return set([(self.uri, g.value(plan, ROOM['outputPredicate']), valueNode, self.uri)]) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
143 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
144 def emptyQuads(element): |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
145 return set([]) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
146 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
147 quads = rx.operators.map(quadsFromValue)(parsed) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
148 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
149 dur = g.value(plan, ROOM['statementLifetime']) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
150 if dur is not None: |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
151 sec = parseDurationLiteral(dur) |
791 | 152 loop = rx.scheduler.eventloop.TwistedScheduler(reactor) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
153 quads = quads.pipe( |
791 | 154 rx.operators.debounce(sec, loop), |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
155 rx.operators.map(emptyQuads), |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
156 rx.operators.merge(quads), |
780 | 157 ) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
158 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
159 return quads |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
160 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
161 def updateQuads(self, newGraphs): |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
162 newQuads = set.union(*newGraphs) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
163 g = graphFromQuads(newQuads) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
164 log.debug(f'{self.uri} update to {len(newQuads)} statements') |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
165 |
796
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
166 for quad in newQuads: |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
167 meas = quad[0].split('/')[-1] |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
168 if meas.startswith('airQuality'): |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
169 where_prefix, type_ = meas[len('airQuality'):].split('door') |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
170 where = where_prefix + 'door' |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
171 metric = 'air' |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
172 tags = {'loc': where.lower(), 'type': type_.lower()} |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
173 val = quad[2].toPython() |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
174 if metric not in collectors: |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
175 collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys()) |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
176 |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
177 collectors[metric].labels(**tags).set(val) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
178 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
179 self.masterGraph.patchSubgraph(self.uri, g) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
180 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
181 def tagIdToUri(self, value: bytearray) -> URIRef: |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
182 justHex = value.decode('ascii').replace('-', '').lower() |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
183 int(justHex, 16) # validate |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
184 return URIRef(f'http://bigasterisk.com/rfidCard/{justHex}') |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
185 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
186 def remap(self, parser, valueStr: str): |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
187 g = self.config |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
188 value = Literal(valueStr) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
189 for entry in g.objects(parser, ROOM['map']): |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
190 if value == g.value(entry, ROOM['from']): |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
191 return g.value(entry, ROOM['to']) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
192 raise KeyError(value) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
193 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
194 |
796
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
195 class Metrics(cyclone.web.RequestHandler): |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
196 |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
197 def get(self): |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
198 self.add_header('content-type', 'text/plain') |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
199 self.write(generate_latest(REGISTRY)) |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
200 |
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
201 |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
202 if __name__ == '__main__': |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
203 arg = docopt(""" |
733
9ca69f2be87b
more mqtt_to_rdf renames. bring in basic LitElement setup for the debug page
drewp@bigasterisk.com
parents:
732
diff
changeset
|
204 Usage: mqtt_to_rdf.py [options] |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
205 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
206 -v Verbose |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
207 --cs=STR Only process config filenames with this substring |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
208 """) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
209 verboseLogging(arg['-v']) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
210 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
211 config = Graph() |
767 | 212 for fn in Path('.').glob('config_*.n3'): |
213 if not arg['--cs'] or str(arg['--cs']) in str(fn): | |
214 log.debug(f'loading {fn}') | |
215 config.parse(str(fn), format='n3') | |
216 else: | |
217 log.debug(f'skipping {fn}') | |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
218 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
219 masterGraph = PatchableGraph() |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
220 |
780 | 221 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated |
222 internalMqtt = MqttClient(clientId='mqtt_to_rdf', | |
223 brokerHost='mosquitto-frontdoor.default.svc.cluster.local', | |
224 brokerPort=10210) | |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
225 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
226 srcs = [] |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
227 for src in config.subjects(RDF.type, ROOM['MqttStatementSource']): |
796
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
228 srcs.append(MqttStatementSource(src, config, masterGraph, mqtt=mqtt, internalMqtt=internalMqtt)) |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
229 log.info(f'set up {len(srcs)} sources') |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
230 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
231 port = 10018 |
780 | 232 reactor.listenTCP(port, |
233 cyclone.web.Application([ | |
234 (r"/()", cyclone.web.StaticFileHandler, { | |
235 "path": ".", | |
236 "default_filename": "index.html" | |
237 }), | |
238 (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, { | |
239 "path": "build" | |
240 }), | |
241 (r"/graph/mqtt", CycloneGraphHandler, { | |
242 'masterGraph': masterGraph | |
243 }), | |
244 (r"/graph/mqtt/events", CycloneGraphEventsHandler, { | |
245 'masterGraph': masterGraph | |
246 }), | |
796
fc74ae6d5d68
rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents:
793
diff
changeset
|
247 (r'/metrics', Metrics), |
780 | 248 ], |
249 mqtt=mqtt, | |
250 internalMqtt=internalMqtt, | |
251 masterGraph=masterGraph, | |
252 debug=arg['-v']), | |
718
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
253 interface='::') |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
254 log.warn('serving on %s', port) |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
255 |
edc14422f128
add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff
changeset
|
256 reactor.run() |