comparison service/mqtt_to_rdf/mqtt_to_rdf.py @ 780:729ab70c6212

reformat, update build
author drewp@bigasterisk.com
date Sat, 08 Aug 2020 15:06:22 -0700
parents f3607a373a00
children 13970578a443
comparison
equal deleted inserted replaced
779:bad87b7dc608 780:729ab70c6212
1 """ 1 """
2 Subscribe to mqtt topics; generate RDF statements. 2 Subscribe to mqtt topics; generate RDF statements.
3 """ 3 """
4 import json 4 import json
5 import sys
6 from pathlib import Path 5 from pathlib import Path
6
7 import cyclone.web
7 from docopt import docopt 8 from docopt import docopt
8 from rdflib import Namespace, URIRef, Literal, Graph, RDF, XSD 9 from export_to_influxdb import InfluxExporter
9 from rdflib.parser import StringInputSource
10 from rdflib.term import Node
11 from twisted.internet import reactor
12 import cyclone.web
13 import rx, rx.operators, rx.scheduler.eventloop
14 from greplin import scales 10 from greplin import scales
15 from greplin.scales.cyclonehandler import StatsHandler 11 from greplin.scales.cyclonehandler import StatsHandler
16
17 from export_to_influxdb import InfluxExporter
18 from mqtt_client import MqttClient 12 from mqtt_client import MqttClient
19 13 from patchablegraph import (
20 from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler 14 CycloneGraphEventsHandler,
21 from rdfdb.patch import Patch 15 CycloneGraphHandler,
16 PatchableGraph,
17 )
22 from rdfdb.rdflibpatch import graphFromQuads 18 from rdfdb.rdflibpatch import graphFromQuads
19 from rdflib import Graph, Literal, Namespace, RDF, URIRef, XSD
20 from rdflib.term import Node
21 import rx
22 import rx.operators
23 import rx.scheduler.eventloop
23 from standardservice.logsetup import log, verboseLogging 24 from standardservice.logsetup import log, verboseLogging
24 from standardservice.scalessetup import gatherProcessStats 25 from standardservice.scalessetup import gatherProcessStats
26 from twisted.internet import reactor
25 27
26 ROOM = Namespace('http://projects.bigasterisk.com/room/') 28 ROOM = Namespace('http://projects.bigasterisk.com/room/')
27 29
28 gatherProcessStats() 30 gatherProcessStats()
31
29 32
30 def parseDurationLiteral(lit: Literal) -> float: 33 def parseDurationLiteral(lit: Literal) -> float:
31 if lit.endswith('s'): 34 if lit.endswith('s'):
32 return float(lit.split('s')[0]) 35 return float(lit.split('s')[0])
33 raise NotImplementedError(f'duration literal: {lit}') 36 raise NotImplementedError(f'duration literal: {lit}')
34 37
35 38
36 class MqttStatementSource: 39 class MqttStatementSource:
40
37 def __init__(self, uri, config, masterGraph, mqtt, internalMqtt, influx): 41 def __init__(self, uri, config, masterGraph, mqtt, internalMqtt, influx):
38 self.uri = uri 42 self.uri = uri
39 self.config = config 43 self.config = config
40 self.masterGraph = masterGraph 44 self.masterGraph = masterGraph
41 self.mqtt = mqtt # deprecated 45 self.mqtt = mqtt # deprecated
42 self.internalMqtt = internalMqtt 46 self.internalMqtt = internalMqtt
43 self.influx = influx 47 self.influx = influx
44 48
45 self.mqttTopic = self.topicFromConfig(self.config) 49 self.mqttTopic = self.topicFromConfig(self.config)
46 log.debug(f'new mqttTopic {self.mqttTopic}') 50 log.debug(f'new mqttTopic {self.mqttTopic}')
47 51
48 statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|') 52 statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|')
49 scales.init(self, statPath) 53 scales.init(self, statPath)
50 self._mqttStats = scales.collection( 54 self._mqttStats = scales.collection(statPath + '/incoming', scales.IntStat('count'), scales.RecentFpsStat('fps'))
51 statPath + '/incoming', scales.IntStat('count'),
52 scales.RecentFpsStat('fps'))
53 55
54 rawBytes = self.subscribeMqtt(self.mqttTopic) 56 rawBytes = self.subscribeMqtt(self.mqttTopic)
55 rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes) 57 rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes)
56 parsed = self.getParser()(rawBytes) 58 parsed = self.getParser()(rawBytes)
57 59
58 g = self.config 60 g = self.config
59 for conv in g.items(g.value(self.uri, ROOM['conversions'])): 61 for conv in g.items(g.value(self.uri, ROOM['conversions'])):
60 parsed = self.conversionStep(conv)(parsed) 62 parsed = self.conversionStep(conv)(parsed)
61 63
62 outputQuadsSets = rx.combine_latest( 64 outputQuadsSets = rx.combine_latest(
63 *[self.makeQuads(parsed, plan) 65 *[self.makeQuads(parsed, plan) for plan in g.objects(self.uri, ROOM['graphStatements'])])
64 for plan in g.objects(self.uri, ROOM['graphStatements'])])
65 66
66 outputQuadsSets.subscribe_(self.updateQuads) 67 outputQuadsSets.subscribe_(self.updateQuads)
67 68
68 def topicFromConfig(self, config) -> bytes: 69 def topicFromConfig(self, config) -> bytes:
69 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) 70 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic'])))
70 return b'/'.join(t.encode('ascii') for t in topicParts) 71 return b'/'.join(t.encode('ascii') for t in topicParts)
71
72 72
73 def subscribeMqtt(self, topic): 73 def subscribeMqtt(self, topic):
74 # goal is to get everyone on the internal broker and eliminate this 74 # goal is to get everyone on the internal broker and eliminate this
75 mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt 75 mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt
76 return mqtt.subscribe(topic) 76 return mqtt.subscribe(topic)
109 else: 109 else:
110 raise NotImplementedError(conv) 110 raise NotImplementedError(conv)
111 111
112 def makeQuads(self, parsed, plan): 112 def makeQuads(self, parsed, plan):
113 g = self.config 113 g = self.config
114
114 def quadsFromValue(valueNode): 115 def quadsFromValue(valueNode):
115 return set([ 116 return set([(self.uri, g.value(plan, ROOM['outputPredicate']), valueNode, self.uri)])
116 (self.uri,
117 g.value(plan, ROOM['outputPredicate']),
118 valueNode,
119 self.uri)
120 ])
121 117
122 def emptyQuads(element): 118 def emptyQuads(element):
123 return set([]) 119 return set([])
124 120
125 quads = rx.operators.map(quadsFromValue)(parsed) 121 quads = rx.operators.map(quadsFromValue)(parsed)
129 sec = parseDurationLiteral(dur) 125 sec = parseDurationLiteral(dur)
130 quads = quads.pipe( 126 quads = quads.pipe(
131 rx.operators.debounce(sec, rx.scheduler.eventloop.TwistedScheduler(reactor)), 127 rx.operators.debounce(sec, rx.scheduler.eventloop.TwistedScheduler(reactor)),
132 rx.operators.map(emptyQuads), 128 rx.operators.map(emptyQuads),
133 rx.operators.merge(quads), 129 rx.operators.merge(quads),
134 ) 130 )
135 131
136 return quads 132 return quads
137 133
138 def updateQuads(self, newGraphs): 134 def updateQuads(self, newGraphs):
139 newQuads = set.union(*newGraphs) 135 newQuads = set.union(*newGraphs)
175 else: 171 else:
176 log.debug(f'skipping {fn}') 172 log.debug(f'skipping {fn}')
177 173
178 masterGraph = PatchableGraph() 174 masterGraph = PatchableGraph()
179 175
180 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='bang', 176 mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated
181 brokerPort=1883) # deprecated 177 internalMqtt = MqttClient(clientId='mqtt_to_rdf',
182 internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='bang', 178 brokerHost='mosquitto-frontdoor.default.svc.cluster.local',
183 brokerPort=10010) 179 brokerPort=10210)
184 influx = InfluxExporter(config) 180 influx = InfluxExporter(config)
185 181
186 srcs = [] 182 srcs = []
187 for src in config.subjects(RDF.type, ROOM['MqttStatementSource']): 183 for src in config.subjects(RDF.type, ROOM['MqttStatementSource']):
188 srcs.append(MqttStatementSource(src, config, masterGraph, 184 srcs.append(MqttStatementSource(src, config, masterGraph, mqtt=mqtt, internalMqtt=internalMqtt, influx=influx))
189 mqtt=mqtt, internalMqtt=internalMqtt, influx=influx))
190 log.info(f'set up {len(srcs)} sources') 185 log.info(f'set up {len(srcs)} sources')
191 186
192 port = 10018 187 port = 10018
193 reactor.listenTCP(port, cyclone.web.Application([ 188 reactor.listenTCP(port,
194 (r"/()", cyclone.web.StaticFileHandler, 189 cyclone.web.Application([
195 {"path": ".", "default_filename": "index.html"}), 190 (r"/()", cyclone.web.StaticFileHandler, {
196 (r"/build/(bundle.js)", 191 "path": ".",
197 cyclone.web.StaticFileHandler, {"path": "build"}), 192 "default_filename": "index.html"
198 (r'/stats/(.*)', StatsHandler, {'serverName': 'mqtt_to_rdf'}), 193 }),
199 (r"/graph/mqtt", CycloneGraphHandler, {'masterGraph': masterGraph}), 194 (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, {
200 (r"/graph/mqtt/events", CycloneGraphEventsHandler, 195 "path": "build"
201 {'masterGraph': masterGraph}), 196 }),
202 ], mqtt=mqtt, internalMqtt=internalMqtt, masterGraph=masterGraph, debug=arg['-v']), 197 (r'/stats/(.*)', StatsHandler, {
198 'serverName': 'mqtt_to_rdf'
199 }),
200 (r"/graph/mqtt", CycloneGraphHandler, {
201 'masterGraph': masterGraph
202 }),
203 (r"/graph/mqtt/events", CycloneGraphEventsHandler, {
204 'masterGraph': masterGraph
205 }),
206 ],
207 mqtt=mqtt,
208 internalMqtt=internalMqtt,
209 masterGraph=masterGraph,
210 debug=arg['-v']),
203 interface='::') 211 interface='::')
204 log.warn('serving on %s', port) 212 log.warn('serving on %s', port)
205 213
206 reactor.run() 214 reactor.run()