Mercurial > code > home > repos > homeauto
comparison service/mqtt_to_rdf/mqtt_to_rdf.py @ 780:729ab70c6212
reformat, update build
author | drewp@bigasterisk.com |
---|---|
date | Sat, 08 Aug 2020 15:06:22 -0700 |
parents | f3607a373a00 |
children | 13970578a443 |
comparison
equal
deleted
inserted
replaced
779:bad87b7dc608 | 780:729ab70c6212 |
---|---|
1 """ | 1 """ |
2 Subscribe to mqtt topics; generate RDF statements. | 2 Subscribe to mqtt topics; generate RDF statements. |
3 """ | 3 """ |
4 import json | 4 import json |
5 import sys | |
6 from pathlib import Path | 5 from pathlib import Path |
6 | |
7 import cyclone.web | |
7 from docopt import docopt | 8 from docopt import docopt |
8 from rdflib import Namespace, URIRef, Literal, Graph, RDF, XSD | 9 from export_to_influxdb import InfluxExporter |
9 from rdflib.parser import StringInputSource | |
10 from rdflib.term import Node | |
11 from twisted.internet import reactor | |
12 import cyclone.web | |
13 import rx, rx.operators, rx.scheduler.eventloop | |
14 from greplin import scales | 10 from greplin import scales |
15 from greplin.scales.cyclonehandler import StatsHandler | 11 from greplin.scales.cyclonehandler import StatsHandler |
16 | |
17 from export_to_influxdb import InfluxExporter | |
18 from mqtt_client import MqttClient | 12 from mqtt_client import MqttClient |
19 | 13 from patchablegraph import ( |
20 from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler | 14 CycloneGraphEventsHandler, |
21 from rdfdb.patch import Patch | 15 CycloneGraphHandler, |
16 PatchableGraph, | |
17 ) | |
22 from rdfdb.rdflibpatch import graphFromQuads | 18 from rdfdb.rdflibpatch import graphFromQuads |
19 from rdflib import Graph, Literal, Namespace, RDF, URIRef, XSD | |
20 from rdflib.term import Node | |
21 import rx | |
22 import rx.operators | |
23 import rx.scheduler.eventloop | |
23 from standardservice.logsetup import log, verboseLogging | 24 from standardservice.logsetup import log, verboseLogging |
24 from standardservice.scalessetup import gatherProcessStats | 25 from standardservice.scalessetup import gatherProcessStats |
26 from twisted.internet import reactor | |
25 | 27 |
26 ROOM = Namespace('http://projects.bigasterisk.com/room/') | 28 ROOM = Namespace('http://projects.bigasterisk.com/room/') |
27 | 29 |
28 gatherProcessStats() | 30 gatherProcessStats() |
31 | |
29 | 32 |
30 def parseDurationLiteral(lit: Literal) -> float: | 33 def parseDurationLiteral(lit: Literal) -> float: |
31 if lit.endswith('s'): | 34 if lit.endswith('s'): |
32 return float(lit.split('s')[0]) | 35 return float(lit.split('s')[0]) |
33 raise NotImplementedError(f'duration literal: {lit}') | 36 raise NotImplementedError(f'duration literal: {lit}') |
34 | 37 |
35 | 38 |
36 class MqttStatementSource: | 39 class MqttStatementSource: |
40 | |
37 def __init__(self, uri, config, masterGraph, mqtt, internalMqtt, influx): | 41 def __init__(self, uri, config, masterGraph, mqtt, internalMqtt, influx): |
38 self.uri = uri | 42 self.uri = uri |
39 self.config = config | 43 self.config = config |
40 self.masterGraph = masterGraph | 44 self.masterGraph = masterGraph |
41 self.mqtt = mqtt # deprecated | 45 self.mqtt = mqtt # deprecated |
42 self.internalMqtt = internalMqtt | 46 self.internalMqtt = internalMqtt |
43 self.influx = influx | 47 self.influx = influx |
44 | 48 |
45 self.mqttTopic = self.topicFromConfig(self.config) | 49 self.mqttTopic = self.topicFromConfig(self.config) |
46 log.debug(f'new mqttTopic {self.mqttTopic}') | 50 log.debug(f'new mqttTopic {self.mqttTopic}') |
47 | 51 |
48 statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|') | 52 statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|') |
49 scales.init(self, statPath) | 53 scales.init(self, statPath) |
50 self._mqttStats = scales.collection( | 54 self._mqttStats = scales.collection(statPath + '/incoming', scales.IntStat('count'), scales.RecentFpsStat('fps')) |
51 statPath + '/incoming', scales.IntStat('count'), | |
52 scales.RecentFpsStat('fps')) | |
53 | 55 |
54 rawBytes = self.subscribeMqtt(self.mqttTopic) | 56 rawBytes = self.subscribeMqtt(self.mqttTopic) |
55 rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes) | 57 rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes) |
56 parsed = self.getParser()(rawBytes) | 58 parsed = self.getParser()(rawBytes) |
57 | 59 |
58 g = self.config | 60 g = self.config |
59 for conv in g.items(g.value(self.uri, ROOM['conversions'])): | 61 for conv in g.items(g.value(self.uri, ROOM['conversions'])): |
60 parsed = self.conversionStep(conv)(parsed) | 62 parsed = self.conversionStep(conv)(parsed) |
61 | 63 |
62 outputQuadsSets = rx.combine_latest( | 64 outputQuadsSets = rx.combine_latest( |
63 *[self.makeQuads(parsed, plan) | 65 *[self.makeQuads(parsed, plan) for plan in g.objects(self.uri, ROOM['graphStatements'])]) |
64 for plan in g.objects(self.uri, ROOM['graphStatements'])]) | |
65 | 66 |
66 outputQuadsSets.subscribe_(self.updateQuads) | 67 outputQuadsSets.subscribe_(self.updateQuads) |
67 | 68 |
68 def topicFromConfig(self, config) -> bytes: | 69 def topicFromConfig(self, config) -> bytes: |
69 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) | 70 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) |
70 return b'/'.join(t.encode('ascii') for t in topicParts) | 71 return b'/'.join(t.encode('ascii') for t in topicParts) |
71 | |
72 | 72 |
73 def subscribeMqtt(self, topic): | 73 def subscribeMqtt(self, topic): |
74 # goal is to get everyone on the internal broker and eliminate this | 74 # goal is to get everyone on the internal broker and eliminate this |
75 mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt | 75 mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt |
76 return mqtt.subscribe(topic) | 76 return mqtt.subscribe(topic) |
109 else: | 109 else: |
110 raise NotImplementedError(conv) | 110 raise NotImplementedError(conv) |
111 | 111 |
112 def makeQuads(self, parsed, plan): | 112 def makeQuads(self, parsed, plan): |
113 g = self.config | 113 g = self.config |
114 | |
114 def quadsFromValue(valueNode): | 115 def quadsFromValue(valueNode): |
115 return set([ | 116 return set([(self.uri, g.value(plan, ROOM['outputPredicate']), valueNode, self.uri)]) |
116 (self.uri, | |
117 g.value(plan, ROOM['outputPredicate']), | |
118 valueNode, | |
119 self.uri) | |
120 ]) | |
121 | 117 |
122 def emptyQuads(element): | 118 def emptyQuads(element): |
123 return set([]) | 119 return set([]) |
124 | 120 |
125 quads = rx.operators.map(quadsFromValue)(parsed) | 121 quads = rx.operators.map(quadsFromValue)(parsed) |
129 sec = parseDurationLiteral(dur) | 125 sec = parseDurationLiteral(dur) |
130 quads = quads.pipe( | 126 quads = quads.pipe( |
131 rx.operators.debounce(sec, rx.scheduler.eventloop.TwistedScheduler(reactor)), | 127 rx.operators.debounce(sec, rx.scheduler.eventloop.TwistedScheduler(reactor)), |
132 rx.operators.map(emptyQuads), | 128 rx.operators.map(emptyQuads), |
133 rx.operators.merge(quads), | 129 rx.operators.merge(quads), |
134 ) | 130 ) |
135 | 131 |
136 return quads | 132 return quads |
137 | 133 |
138 def updateQuads(self, newGraphs): | 134 def updateQuads(self, newGraphs): |
139 newQuads = set.union(*newGraphs) | 135 newQuads = set.union(*newGraphs) |
175 else: | 171 else: |
176 log.debug(f'skipping {fn}') | 172 log.debug(f'skipping {fn}') |
177 | 173 |
178 masterGraph = PatchableGraph() | 174 masterGraph = PatchableGraph() |
179 | 175 |
180 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='bang', | 176 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated |
181 brokerPort=1883) # deprecated | 177 internalMqtt = MqttClient(clientId='mqtt_to_rdf', |
182 internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='bang', | 178 brokerHost='mosquitto-frontdoor.default.svc.cluster.local', |
183 brokerPort=10010) | 179 brokerPort=10210) |
184 influx = InfluxExporter(config) | 180 influx = InfluxExporter(config) |
185 | 181 |
186 srcs = [] | 182 srcs = [] |
187 for src in config.subjects(RDF.type, ROOM['MqttStatementSource']): | 183 for src in config.subjects(RDF.type, ROOM['MqttStatementSource']): |
188 srcs.append(MqttStatementSource(src, config, masterGraph, | 184 srcs.append(MqttStatementSource(src, config, masterGraph, mqtt=mqtt, internalMqtt=internalMqtt, influx=influx)) |
189 mqtt=mqtt, internalMqtt=internalMqtt, influx=influx)) | |
190 log.info(f'set up {len(srcs)} sources') | 185 log.info(f'set up {len(srcs)} sources') |
191 | 186 |
192 port = 10018 | 187 port = 10018 |
193 reactor.listenTCP(port, cyclone.web.Application([ | 188 reactor.listenTCP(port, |
194 (r"/()", cyclone.web.StaticFileHandler, | 189 cyclone.web.Application([ |
195 {"path": ".", "default_filename": "index.html"}), | 190 (r"/()", cyclone.web.StaticFileHandler, { |
196 (r"/build/(bundle.js)", | 191 "path": ".", |
197 cyclone.web.StaticFileHandler, {"path": "build"}), | 192 "default_filename": "index.html" |
198 (r'/stats/(.*)', StatsHandler, {'serverName': 'mqtt_to_rdf'}), | 193 }), |
199 (r"/graph/mqtt", CycloneGraphHandler, {'masterGraph': masterGraph}), | 194 (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, { |
200 (r"/graph/mqtt/events", CycloneGraphEventsHandler, | 195 "path": "build" |
201 {'masterGraph': masterGraph}), | 196 }), |
202 ], mqtt=mqtt, internalMqtt=internalMqtt, masterGraph=masterGraph, debug=arg['-v']), | 197 (r'/stats/(.*)', StatsHandler, { |
198 'serverName': 'mqtt_to_rdf' | |
199 }), | |
200 (r"/graph/mqtt", CycloneGraphHandler, { | |
201 'masterGraph': masterGraph | |
202 }), | |
203 (r"/graph/mqtt/events", CycloneGraphEventsHandler, { | |
204 'masterGraph': masterGraph | |
205 }), | |
206 ], | |
207 mqtt=mqtt, | |
208 internalMqtt=internalMqtt, | |
209 masterGraph=masterGraph, | |
210 debug=arg['-v']), | |
203 interface='::') | 211 interface='::') |
204 log.warn('serving on %s', port) | 212 log.warn('serving on %s', port) |
205 | 213 |
206 reactor.run() | 214 reactor.run() |