Mercurial > code > home > repos > homeauto
comparison service/mqtt_to_rdf/mqtt_to_rdf.py @ 767:f3607a373a00
front door support on another broker
Ignore-this: 88794f304d22ed9f4f9fc92d84d3ae3c
author | drewp@bigasterisk.com |
---|---|
date | Sat, 02 May 2020 15:07:03 -0700 |
parents | 9ca69f2be87b |
children | 729ab70c6212 |
comparison
equal
deleted
inserted
replaced
766:ac9c516d3973 | 767:f3607a373a00 |
---|---|
1 """ | 1 """ |
2 Subscribe to mqtt topics; generate RDF statements. | 2 Subscribe to mqtt topics; generate RDF statements. |
3 """ | 3 """ |
4 import json | 4 import json |
5 import sys | 5 import sys |
6 from pathlib import Path | |
6 from docopt import docopt | 7 from docopt import docopt |
7 from rdflib import Namespace, URIRef, Literal, Graph, RDF, XSD | 8 from rdflib import Namespace, URIRef, Literal, Graph, RDF, XSD |
8 from rdflib.parser import StringInputSource | 9 from rdflib.parser import StringInputSource |
9 from rdflib.term import Node | 10 from rdflib.term import Node |
10 from twisted.internet import reactor | 11 from twisted.internet import reactor |
31 return float(lit.split('s')[0]) | 32 return float(lit.split('s')[0]) |
32 raise NotImplementedError(f'duration literal: {lit}') | 33 raise NotImplementedError(f'duration literal: {lit}') |
33 | 34 |
34 | 35 |
35 class MqttStatementSource: | 36 class MqttStatementSource: |
36 def __init__(self, uri, config, masterGraph, mqtt, influx): | 37 def __init__(self, uri, config, masterGraph, mqtt, internalMqtt, influx): |
37 self.uri = uri | 38 self.uri = uri |
38 self.config = config | 39 self.config = config |
39 self.masterGraph = masterGraph | 40 self.masterGraph = masterGraph |
40 self.mqtt = mqtt | 41 self.mqtt = mqtt # deprecated |
42 self.internalMqtt = internalMqtt | |
41 self.influx = influx | 43 self.influx = influx |
42 | 44 |
43 self.mqttTopic = self.topicFromConfig(self.config) | 45 self.mqttTopic = self.topicFromConfig(self.config) |
46 log.debug(f'new mqttTopic {self.mqttTopic}') | |
44 | 47 |
45 statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|') | 48 statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|') |
46 scales.init(self, statPath) | 49 scales.init(self, statPath) |
47 self._mqttStats = scales.collection( | 50 self._mqttStats = scales.collection( |
48 statPath + '/incoming', scales.IntStat('count'), | 51 statPath + '/incoming', scales.IntStat('count'), |
49 scales.RecentFpsStat('fps')) | 52 scales.RecentFpsStat('fps')) |
50 | 53 |
51 | 54 rawBytes = self.subscribeMqtt(self.mqttTopic) |
52 rawBytes = self.subscribeMqtt() | |
53 rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes) | 55 rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes) |
54 parsed = self.getParser()(rawBytes) | 56 parsed = self.getParser()(rawBytes) |
55 | 57 |
56 g = self.config | 58 g = self.config |
57 for conv in g.items(g.value(self.uri, ROOM['conversions'])): | 59 for conv in g.items(g.value(self.uri, ROOM['conversions'])): |
66 def topicFromConfig(self, config) -> bytes: | 68 def topicFromConfig(self, config) -> bytes: |
67 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) | 69 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) |
68 return b'/'.join(t.encode('ascii') for t in topicParts) | 70 return b'/'.join(t.encode('ascii') for t in topicParts) |
69 | 71 |
70 | 72 |
71 def subscribeMqtt(self): | 73 def subscribeMqtt(self, topic): |
72 return self.mqtt.subscribe(self.mqttTopic) | 74 # goal is to get everyone on the internal broker and eliminate this |
75 mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt | |
76 return mqtt.subscribe(topic) | |
73 | 77 |
74 def countIncomingMessage(self, _): | 78 def countIncomingMessage(self, _): |
75 self._mqttStats.fps.mark() | 79 self._mqttStats.fps.mark() |
76 self._mqttStats.count += 1 | 80 self._mqttStats.count += 1 |
77 | 81 |
162 --cs=STR Only process config filenames with this substring | 166 --cs=STR Only process config filenames with this substring |
163 """) | 167 """) |
164 verboseLogging(arg['-v']) | 168 verboseLogging(arg['-v']) |
165 | 169 |
166 config = Graph() | 170 config = Graph() |
167 for fn in [ | 171 for fn in Path('.').glob('config_*.n3'): |
168 "config_cardreader.n3", | 172 if not arg['--cs'] or str(arg['--cs']) in str(fn): |
169 "config_nightlight_ari.n3", | 173 log.debug(f'loading {fn}') |
170 "config_bed_bar.n3", | 174 config.parse(str(fn), format='n3') |
171 "config_air_quality_indoor.n3", | 175 else: |
172 "config_air_quality_outdoor.n3", | 176 log.debug(f'skipping {fn}') |
173 "config_living_lamps.n3", | |
174 "config_kitchen.n3", | |
175 ]: | |
176 if not arg['--cs'] or arg['--cs'] in fn: | |
177 config.parse(fn, format='n3') | |
178 | 177 |
179 masterGraph = PatchableGraph() | 178 masterGraph = PatchableGraph() |
180 | 179 |
181 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='bang', | 180 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='bang', |
182 brokerPort=1883) | 181 brokerPort=1883) # deprecated |
182 internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='bang', | |
183 brokerPort=10010) | |
183 influx = InfluxExporter(config) | 184 influx = InfluxExporter(config) |
184 | 185 |
185 srcs = [] | 186 srcs = [] |
186 for src in config.subjects(RDF.type, ROOM['MqttStatementSource']): | 187 for src in config.subjects(RDF.type, ROOM['MqttStatementSource']): |
187 srcs.append(MqttStatementSource(src, config, masterGraph, mqtt, influx)) | 188 srcs.append(MqttStatementSource(src, config, masterGraph, |
189 mqtt=mqtt, internalMqtt=internalMqtt, influx=influx)) | |
188 log.info(f'set up {len(srcs)} sources') | 190 log.info(f'set up {len(srcs)} sources') |
189 | 191 |
190 port = 10018 | 192 port = 10018 |
191 reactor.listenTCP(port, cyclone.web.Application([ | 193 reactor.listenTCP(port, cyclone.web.Application([ |
192 (r"/()", cyclone.web.StaticFileHandler, | 194 (r"/()", cyclone.web.StaticFileHandler, |
195 cyclone.web.StaticFileHandler, {"path": "build"}), | 197 cyclone.web.StaticFileHandler, {"path": "build"}), |
196 (r'/stats/(.*)', StatsHandler, {'serverName': 'mqtt_to_rdf'}), | 198 (r'/stats/(.*)', StatsHandler, {'serverName': 'mqtt_to_rdf'}), |
197 (r"/graph/mqtt", CycloneGraphHandler, {'masterGraph': masterGraph}), | 199 (r"/graph/mqtt", CycloneGraphHandler, {'masterGraph': masterGraph}), |
198 (r"/graph/mqtt/events", CycloneGraphEventsHandler, | 200 (r"/graph/mqtt/events", CycloneGraphEventsHandler, |
199 {'masterGraph': masterGraph}), | 201 {'masterGraph': masterGraph}), |
200 ], mqtt=mqtt, masterGraph=masterGraph, debug=arg['-v']), | 202 ], mqtt=mqtt, internalMqtt=internalMqtt, masterGraph=masterGraph, debug=arg['-v']), |
201 interface='::') | 203 interface='::') |
202 log.warn('serving on %s', port) | 204 log.warn('serving on %s', port) |
203 | 205 |
204 reactor.run() | 206 reactor.run() |