annotate service/mqtt_to_rdf/mqtt_to_rdf.py @ 797:a3e430b39177

reformat
author drewp@bigasterisk.com
date Tue, 29 Dec 2020 20:55:24 -0800
parents fc74ae6d5d68
children cdc76c84e3e2
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
1 """
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
2 Subscribe to mqtt topics; generate RDF statements.
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
3 """
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
4 import json
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
5 from pathlib import Path
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
6 from typing import Callable, cast
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
7
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
8 import cyclone.web
797
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
9 import prometheus_client
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
10 import rx
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
11 import rx.operators
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
12 import rx.scheduler.eventloop
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
13 from docopt import docopt
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
14 from mqtt_client import MqttClient
797
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
15 from patchablegraph import (CycloneGraphEventsHandler, CycloneGraphHandler, PatchableGraph)
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
16 from prometheus_client import Counter, Gauge, Histogram, Summary
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
17 from prometheus_client.exposition import generate_latest
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
18 from prometheus_client.registry import REGISTRY
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
19 from rdfdb.rdflibpatch import graphFromQuads
797
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
20 from rdflib import RDF, XSD, Graph, Literal, Namespace, URIRef
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
21 from rdflib.term import Node
791
8f4e814eb1ab cleanup
drewp@bigasterisk.com
parents: 787
diff changeset
22 from rx.core import Observable
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
23 from standardservice.logsetup import log, verboseLogging
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
24 from twisted.internet import reactor
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
25
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
26 from button_events import button_events
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
27
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
28 ROOM = Namespace('http://projects.bigasterisk.com/room/')
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
29
797
a3e430b39177 reformat
drewp@bigasterisk.com
parents: 796
diff changeset
30 collectors = {}
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
31
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
32
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
33 def parseDurationLiteral(lit: Literal) -> float:
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
34 if lit.endswith('s'):
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
35 return float(lit.split('s')[0])
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
36 raise NotImplementedError(f'duration literal: {lit}')
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
37
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
38
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
39 class MqttStatementSource:
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
40
796
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
41 def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt):
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
42 self.uri = uri
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
43 self.config = config
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
44 self.masterGraph = masterGraph
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
45 self.mqtt = mqtt # deprecated
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
46 self.internalMqtt = internalMqtt
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
47
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
48 self.mqttTopic = self.topicFromConfig(self.config)
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
49 log.debug(f'new mqttTopic {self.mqttTopic}')
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
50
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
51 statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|')
796
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
52 #scales.init(self, statPath)
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
53 #self._mqttStats = scales.collection(statPath + '/incoming', scales.IntStat('count'), scales.RecentFpsStat('fps'))
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
54
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
55 rawBytes = self.subscribeMqtt(self.mqttTopic)
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
56 rawBytes = self.addFilters(rawBytes)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
57 rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
58 parsed = self.getParser()(rawBytes)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
59
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
60 g = self.config
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
61 for conv in g.items(g.value(self.uri, ROOM['conversions'])):
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
62 parsed = self.conversionStep(conv)(parsed)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
63
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
64 outputQuadsSets = rx.combine_latest(
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
65 *[self.makeQuads(parsed, plan) for plan in g.objects(self.uri, ROOM['graphStatements'])])
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
66
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
67 outputQuadsSets.subscribe_(self.updateQuads)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
68
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
69 def addFilters(self, rawBytes):
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
70 jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals'])
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
71 if jsonEq:
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
72 required = json.loads(jsonEq.toPython())
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
73
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
74 def eq(jsonBytes):
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
75 msg = json.loads(jsonBytes.decode('ascii'))
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
76 return msg == required
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
77
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
78 rawBytes = rx.operators.filter(eq)(rawBytes)
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
79 return rawBytes
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
80
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
81 def topicFromConfig(self, config) -> bytes:
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
82 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic'])))
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
83 return b'/'.join(t.encode('ascii') for t in topicParts)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
84
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
85 def subscribeMqtt(self, topic):
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
86 # goal is to get everyone on the internal broker and eliminate this
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
87 mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
88 return mqtt.subscribe(topic)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
89
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
90 def countIncomingMessage(self, _):
796
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
91 pass #self._mqttStats.fps.mark()
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
92 #self._mqttStats.count += 1
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
93
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
94 def getParser(self):
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
95 g = self.config
791
8f4e814eb1ab cleanup
drewp@bigasterisk.com
parents: 787
diff changeset
96
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
97 parser = g.value(self.uri, ROOM['parser'])
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
98 if parser == XSD.double:
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
99 return rx.operators.map(lambda v: Literal(float(v.decode('ascii'))))
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
100 elif parser == ROOM['tagIdToUri']:
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
101 return rx.operators.map(self.tagIdToUri)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
102 elif parser == ROOM['onOffBrightness']:
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
103 return rx.operators.map(lambda v: Literal(0.0 if v == b'OFF' else 1.0))
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
104 elif parser == ROOM['jsonBrightness']:
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
105 return rx.operators.map(self.parseJsonBrightness)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
106 elif ROOM['ValueMap'] in g.objects(parser, RDF.type):
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
107 return rx.operators.map(lambda v: self.remap(parser, v.decode('ascii')))
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
108 elif parser == ROOM['rfCode']:
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
109 return rx.operators.map(self.parseJsonRfCode)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
110 else:
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
111 raise NotImplementedError(parser)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
112
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
113 def parseJsonBrightness(self, mqttValue: bytes):
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
114 msg = json.loads(mqttValue.decode('ascii'))
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
115 return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
116
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
117 def parseJsonRfCode(self, mqttValue: bytes):
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
118 msg = json.loads(mqttValue.decode('ascii'))
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
119 return Literal('%08x%08x' % (msg['code0'], msg['code1']))
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
120
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
121 def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]:
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
122 g = self.config
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
123 if conv == ROOM['celsiusToFarenheit']:
791
8f4e814eb1ab cleanup
drewp@bigasterisk.com
parents: 787
diff changeset
124
8f4e814eb1ab cleanup
drewp@bigasterisk.com
parents: 787
diff changeset
125 def c2f(value: Literal) -> Node:
8f4e814eb1ab cleanup
drewp@bigasterisk.com
parents: 787
diff changeset
126 return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2))
8f4e814eb1ab cleanup
drewp@bigasterisk.com
parents: 787
diff changeset
127
8f4e814eb1ab cleanup
drewp@bigasterisk.com
parents: 787
diff changeset
128 return rx.operators.map(c2f)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
129 elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None:
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
130 threshold = g.value(conv, ROOM['ignoreValueBelow'])
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
131 return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython())
793
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
132 elif conv == ROOM['buttonPress']:
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
133 loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
c3e3bd5dfa0b add rf button mqtt message processing
drewp@bigasterisk.com
parents: 791
diff changeset
134 return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
135 else:
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
136 raise NotImplementedError(conv)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
137
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
138 def makeQuads(self, parsed, plan):
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
139 g = self.config
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
140
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
141 def quadsFromValue(valueNode):
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
142 return set([(self.uri, g.value(plan, ROOM['outputPredicate']), valueNode, self.uri)])
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
143
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
144 def emptyQuads(element):
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
145 return set([])
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
146
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
147 quads = rx.operators.map(quadsFromValue)(parsed)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
148
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
149 dur = g.value(plan, ROOM['statementLifetime'])
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
150 if dur is not None:
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
151 sec = parseDurationLiteral(dur)
791
8f4e814eb1ab cleanup
drewp@bigasterisk.com
parents: 787
diff changeset
152 loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
153 quads = quads.pipe(
791
8f4e814eb1ab cleanup
drewp@bigasterisk.com
parents: 787
diff changeset
154 rx.operators.debounce(sec, loop),
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
155 rx.operators.map(emptyQuads),
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
156 rx.operators.merge(quads),
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
157 )
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
158
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
159 return quads
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
160
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
161 def updateQuads(self, newGraphs):
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
162 newQuads = set.union(*newGraphs)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
163 g = graphFromQuads(newQuads)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
164 log.debug(f'{self.uri} update to {len(newQuads)} statements')
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
165
796
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
166 for quad in newQuads:
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
167 meas = quad[0].split('/')[-1]
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
168 if meas.startswith('airQuality'):
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
169 where_prefix, type_ = meas[len('airQuality'):].split('door')
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
170 where = where_prefix + 'door'
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
171 metric = 'air'
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
172 tags = {'loc': where.lower(), 'type': type_.lower()}
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
173 val = quad[2].toPython()
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
174 if metric not in collectors:
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
175 collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys())
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
176
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
177 collectors[metric].labels(**tags).set(val)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
178
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
179 self.masterGraph.patchSubgraph(self.uri, g)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
180
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
181 def tagIdToUri(self, value: bytearray) -> URIRef:
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
182 justHex = value.decode('ascii').replace('-', '').lower()
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
183 int(justHex, 16) # validate
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
184 return URIRef(f'http://bigasterisk.com/rfidCard/{justHex}')
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
185
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
186 def remap(self, parser, valueStr: str):
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
187 g = self.config
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
188 value = Literal(valueStr)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
189 for entry in g.objects(parser, ROOM['map']):
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
190 if value == g.value(entry, ROOM['from']):
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
191 return g.value(entry, ROOM['to'])
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
192 raise KeyError(value)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
193
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
194
796
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
195 class Metrics(cyclone.web.RequestHandler):
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
196
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
197 def get(self):
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
198 self.add_header('content-type', 'text/plain')
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
199 self.write(generate_latest(REGISTRY))
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
200
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
201
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
202 if __name__ == '__main__':
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
203 arg = docopt("""
733
9ca69f2be87b more mqtt_to_rdf renames. bring in basic LitElement setup for the debug page
drewp@bigasterisk.com
parents: 732
diff changeset
204 Usage: mqtt_to_rdf.py [options]
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
205
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
206 -v Verbose
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
207 --cs=STR Only process config filenames with this substring
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
208 """)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
209 verboseLogging(arg['-v'])
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
210
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
211 config = Graph()
767
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
212 for fn in Path('.').glob('config_*.n3'):
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
213 if not arg['--cs'] or str(arg['--cs']) in str(fn):
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
214 log.debug(f'loading {fn}')
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
215 config.parse(str(fn), format='n3')
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
216 else:
f3607a373a00 front door support on another broker
drewp@bigasterisk.com
parents: 733
diff changeset
217 log.debug(f'skipping {fn}')
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
218
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
219 masterGraph = PatchableGraph()
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
220
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
221 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
222 internalMqtt = MqttClient(clientId='mqtt_to_rdf',
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
223 brokerHost='mosquitto-frontdoor.default.svc.cluster.local',
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
224 brokerPort=10210)
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
225
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
226 srcs = []
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
227 for src in config.subjects(RDF.type, ROOM['MqttStatementSource']):
796
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
228 srcs.append(MqttStatementSource(src, config, masterGraph, mqtt=mqtt, internalMqtt=internalMqtt))
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
229 log.info(f'set up {len(srcs)} sources')
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
230
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
231 port = 10018
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
232 reactor.listenTCP(port,
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
233 cyclone.web.Application([
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
234 (r"/()", cyclone.web.StaticFileHandler, {
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
235 "path": ".",
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
236 "default_filename": "index.html"
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
237 }),
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
238 (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, {
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
239 "path": "build"
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
240 }),
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
241 (r"/graph/mqtt", CycloneGraphHandler, {
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
242 'masterGraph': masterGraph
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
243 }),
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
244 (r"/graph/mqtt/events", CycloneGraphEventsHandler, {
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
245 'masterGraph': masterGraph
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
246 }),
796
fc74ae6d5d68 rm influx (was broken); write some data to prometheus
drewp@bigasterisk.com
parents: 793
diff changeset
247 (r'/metrics', Metrics),
780
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
248 ],
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
249 mqtt=mqtt,
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
250 internalMqtt=internalMqtt,
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
251 masterGraph=masterGraph,
729ab70c6212 reformat, update build
drewp@bigasterisk.com
parents: 767
diff changeset
252 debug=arg['-v']),
718
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
253 interface='::')
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
254 log.warn('serving on %s', port)
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
255
edc14422f128 add rdf_from_mqtt, though at the moment the graph urls may not be in sync and the reqs have just been updated
drewp@bigasterisk.com
parents:
diff changeset
256 reactor.run()