comparison service/mqtt_to_rdf/mqtt_to_rdf.py @ 767:f3607a373a00

front door support on another broker Ignore-this: 88794f304d22ed9f4f9fc92d84d3ae3c
author drewp@bigasterisk.com
date Sat, 02 May 2020 15:07:03 -0700
parents 9ca69f2be87b
children 729ab70c6212
comparison
equal deleted inserted replaced
766:ac9c516d3973 767:f3607a373a00
1 """ 1 """
2 Subscribe to mqtt topics; generate RDF statements. 2 Subscribe to mqtt topics; generate RDF statements.
3 """ 3 """
4 import json 4 import json
5 import sys 5 import sys
6 from pathlib import Path
6 from docopt import docopt 7 from docopt import docopt
7 from rdflib import Namespace, URIRef, Literal, Graph, RDF, XSD 8 from rdflib import Namespace, URIRef, Literal, Graph, RDF, XSD
8 from rdflib.parser import StringInputSource 9 from rdflib.parser import StringInputSource
9 from rdflib.term import Node 10 from rdflib.term import Node
10 from twisted.internet import reactor 11 from twisted.internet import reactor
31 return float(lit.split('s')[0]) 32 return float(lit.split('s')[0])
32 raise NotImplementedError(f'duration literal: {lit}') 33 raise NotImplementedError(f'duration literal: {lit}')
33 34
34 35
35 class MqttStatementSource: 36 class MqttStatementSource:
36 def __init__(self, uri, config, masterGraph, mqtt, influx): 37 def __init__(self, uri, config, masterGraph, mqtt, internalMqtt, influx):
37 self.uri = uri 38 self.uri = uri
38 self.config = config 39 self.config = config
39 self.masterGraph = masterGraph 40 self.masterGraph = masterGraph
40 self.mqtt = mqtt 41 self.mqtt = mqtt # deprecated
42 self.internalMqtt = internalMqtt
41 self.influx = influx 43 self.influx = influx
42 44
43 self.mqttTopic = self.topicFromConfig(self.config) 45 self.mqttTopic = self.topicFromConfig(self.config)
46 log.debug(f'new mqttTopic {self.mqttTopic}')
44 47
45 statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|') 48 statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|')
46 scales.init(self, statPath) 49 scales.init(self, statPath)
47 self._mqttStats = scales.collection( 50 self._mqttStats = scales.collection(
48 statPath + '/incoming', scales.IntStat('count'), 51 statPath + '/incoming', scales.IntStat('count'),
49 scales.RecentFpsStat('fps')) 52 scales.RecentFpsStat('fps'))
50 53
51 54 rawBytes = self.subscribeMqtt(self.mqttTopic)
52 rawBytes = self.subscribeMqtt()
53 rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes) 55 rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes)
54 parsed = self.getParser()(rawBytes) 56 parsed = self.getParser()(rawBytes)
55 57
56 g = self.config 58 g = self.config
57 for conv in g.items(g.value(self.uri, ROOM['conversions'])): 59 for conv in g.items(g.value(self.uri, ROOM['conversions'])):
66 def topicFromConfig(self, config) -> bytes: 68 def topicFromConfig(self, config) -> bytes:
67 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) 69 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic'])))
68 return b'/'.join(t.encode('ascii') for t in topicParts) 70 return b'/'.join(t.encode('ascii') for t in topicParts)
69 71
70 72
71 def subscribeMqtt(self): 73 def subscribeMqtt(self, topic):
72 return self.mqtt.subscribe(self.mqttTopic) 74 # goal is to get everyone on the internal broker and eliminate this
75 mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt
76 return mqtt.subscribe(topic)
73 77
74 def countIncomingMessage(self, _): 78 def countIncomingMessage(self, _):
75 self._mqttStats.fps.mark() 79 self._mqttStats.fps.mark()
76 self._mqttStats.count += 1 80 self._mqttStats.count += 1
77 81
162 --cs=STR Only process config filenames with this substring 166 --cs=STR Only process config filenames with this substring
163 """) 167 """)
164 verboseLogging(arg['-v']) 168 verboseLogging(arg['-v'])
165 169
166 config = Graph() 170 config = Graph()
167 for fn in [ 171 for fn in Path('.').glob('config_*.n3'):
168 "config_cardreader.n3", 172 if not arg['--cs'] or str(arg['--cs']) in str(fn):
169 "config_nightlight_ari.n3", 173 log.debug(f'loading {fn}')
170 "config_bed_bar.n3", 174 config.parse(str(fn), format='n3')
171 "config_air_quality_indoor.n3", 175 else:
172 "config_air_quality_outdoor.n3", 176 log.debug(f'skipping {fn}')
173 "config_living_lamps.n3",
174 "config_kitchen.n3",
175 ]:
176 if not arg['--cs'] or arg['--cs'] in fn:
177 config.parse(fn, format='n3')
178 177
179 masterGraph = PatchableGraph() 178 masterGraph = PatchableGraph()
180 179
181 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='bang', 180 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='bang',
182 brokerPort=1883) 181 brokerPort=1883) # deprecated
182 internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='bang',
183 brokerPort=10010)
183 influx = InfluxExporter(config) 184 influx = InfluxExporter(config)
184 185
185 srcs = [] 186 srcs = []
186 for src in config.subjects(RDF.type, ROOM['MqttStatementSource']): 187 for src in config.subjects(RDF.type, ROOM['MqttStatementSource']):
187 srcs.append(MqttStatementSource(src, config, masterGraph, mqtt, influx)) 188 srcs.append(MqttStatementSource(src, config, masterGraph,
189 mqtt=mqtt, internalMqtt=internalMqtt, influx=influx))
188 log.info(f'set up {len(srcs)} sources') 190 log.info(f'set up {len(srcs)} sources')
189 191
190 port = 10018 192 port = 10018
191 reactor.listenTCP(port, cyclone.web.Application([ 193 reactor.listenTCP(port, cyclone.web.Application([
192 (r"/()", cyclone.web.StaticFileHandler, 194 (r"/()", cyclone.web.StaticFileHandler,
195 cyclone.web.StaticFileHandler, {"path": "build"}), 197 cyclone.web.StaticFileHandler, {"path": "build"}),
196 (r'/stats/(.*)', StatsHandler, {'serverName': 'mqtt_to_rdf'}), 198 (r'/stats/(.*)', StatsHandler, {'serverName': 'mqtt_to_rdf'}),
197 (r"/graph/mqtt", CycloneGraphHandler, {'masterGraph': masterGraph}), 199 (r"/graph/mqtt", CycloneGraphHandler, {'masterGraph': masterGraph}),
198 (r"/graph/mqtt/events", CycloneGraphEventsHandler, 200 (r"/graph/mqtt/events", CycloneGraphEventsHandler,
199 {'masterGraph': masterGraph}), 201 {'masterGraph': masterGraph}),
200 ], mqtt=mqtt, masterGraph=masterGraph, debug=arg['-v']), 202 ], mqtt=mqtt, internalMqtt=internalMqtt, masterGraph=masterGraph, debug=arg['-v']),
201 interface='::') 203 interface='::')
202 log.warn('serving on %s', port) 204 log.warn('serving on %s', port)
203 205
204 reactor.run() 206 reactor.run()