comparison service/mqtt_to_rdf/rdf_from_mqtt.py @ 1532:7cc7700302c2

more service renaming; start a lot more serv.n3 job files Ignore-this: 635aaefc7bd2fa5558eefb8b3fc9ec75 darcs-hash:2c8b587cbefa4db427f9a82676abdb47e651187e
author drewp <drewp@bigasterisk.com>
date Thu, 06 Feb 2020 16:36:35 -0800
parents
children
comparison
equal deleted inserted replaced
1531:a3dc6a31590f 1532:7cc7700302c2
1 """
2 Subscribe to mqtt topics; generate RDF statements.
3 """
4 import json
5 import sys
6 from docopt import docopt
7 from rdflib import Namespace, URIRef, Literal, Graph, RDF, XSD
8 from rdflib.parser import StringInputSource
9 from rdflib.term import Node
10 from twisted.internet import reactor
11 import cyclone.web
12 import rx, rx.operators, rx.scheduler.eventloop
13 from greplin import scales
14 from greplin.scales.cyclonehandler import StatsHandler
15
16 from export_to_influxdb import InfluxExporter
17 from mqtt_client import MqttClient
18
19 from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler
20 from rdfdb.patch import Patch
21 from rdfdb.rdflibpatch import graphFromQuads
22 from standardservice.logsetup import log, verboseLogging
23 from standardservice.scalessetup import gatherProcessStats
24
25 ROOM = Namespace('http://projects.bigasterisk.com/room/')
26
27 gatherProcessStats()
28
29 def parseDurationLiteral(lit: Literal) -> float:
30 if lit.endswith('s'):
31 return float(lit.split('s')[0])
32 raise NotImplementedError(f'duration literal: {lit}')
33
34
35 class MqttStatementSource:
36 def __init__(self, uri, config, masterGraph, mqtt, influx):
37 self.uri = uri
38 self.config = config
39 self.masterGraph = masterGraph
40 self.mqtt = mqtt
41 self.influx = influx
42
43 self.mqttTopic = self.topicFromConfig(self.config)
44
45 statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|')
46 scales.init(self, statPath)
47 self._mqttStats = scales.collection(
48 statPath + '/incoming', scales.IntStat('count'),
49 scales.RecentFpsStat('fps'))
50
51
52 rawBytes = self.subscribeMqtt()
53 rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes)
54 parsed = self.getParser()(rawBytes)
55
56 g = self.config
57 for conv in g.items(g.value(self.uri, ROOM['conversions'])):
58 parsed = self.conversionStep(conv)(parsed)
59
60 outputQuadsSets = rx.combine_latest(
61 *[self.makeQuads(parsed, plan)
62 for plan in g.objects(self.uri, ROOM['graphStatements'])])
63
64 outputQuadsSets.subscribe_(self.updateQuads)
65
66 def topicFromConfig(self, config) -> bytes:
67 topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic'])))
68 return b'/'.join(t.encode('ascii') for t in topicParts)
69
70
71 def subscribeMqtt(self):
72 return self.mqtt.subscribe(self.mqttTopic)
73
74 def countIncomingMessage(self, _):
75 self._mqttStats.fps.mark()
76 self._mqttStats.count += 1
77
78 def getParser(self):
79 g = self.config
80 parser = g.value(self.uri, ROOM['parser'])
81 if parser == XSD.double:
82 return rx.operators.map(lambda v: Literal(float(v.decode('ascii'))))
83 elif parser == ROOM['tagIdToUri']:
84 return rx.operators.map(self.tagIdToUri)
85 elif parser == ROOM['onOffBrightness']:
86 return rx.operators.map(lambda v: Literal(0.0 if v == b'OFF' else 1.0))
87 elif parser == ROOM['jsonBrightness']:
88 return rx.operators.map(self.parseJsonBrightness)
89 elif ROOM['ValueMap'] in g.objects(parser, RDF.type):
90 return rx.operators.map(lambda v: self.remap(parser, v.decode('ascii')))
91 else:
92 raise NotImplementedError(parser)
93
94 def parseJsonBrightness(self, mqttValue: bytes):
95 msg = json.loads(mqttValue.decode('ascii'))
96 return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0)
97
98 def conversionStep(self, conv: Node):
99 g = self.config
100 if conv == ROOM['celsiusToFarenheit']:
101 return rx.operators.map(lambda value: Literal(round(value.toPython() * 1.8 + 32, 2)))
102 elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None:
103 threshold = g.value(conv, ROOM['ignoreValueBelow'])
104 return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython())
105 else:
106 raise NotImplementedError(conv)
107
108 def makeQuads(self, parsed, plan):
109 g = self.config
110 def quadsFromValue(valueNode):
111 return set([
112 (self.uri,
113 g.value(plan, ROOM['outputPredicate']),
114 valueNode,
115 self.uri)
116 ])
117
118 def emptyQuads(element):
119 return set([])
120
121 quads = rx.operators.map(quadsFromValue)(parsed)
122
123 dur = g.value(plan, ROOM['statementLifetime'])
124 if dur is not None:
125 sec = parseDurationLiteral(dur)
126 quads = quads.pipe(
127 rx.operators.debounce(sec, rx.scheduler.eventloop.TwistedScheduler(reactor)),
128 rx.operators.map(emptyQuads),
129 rx.operators.merge(quads),
130 )
131
132 return quads
133
134 def updateQuads(self, newGraphs):
135 newQuads = set.union(*newGraphs)
136 g = graphFromQuads(newQuads)
137 log.debug(f'{self.uri} update to {len(newQuads)} statements')
138
139 self.influx.exportToInflux(newQuads)
140
141 self.masterGraph.patchSubgraph(self.uri, g)
142
143 def tagIdToUri(self, value: bytearray) -> URIRef:
144 justHex = value.decode('ascii').replace('-', '').lower()
145 int(justHex, 16) # validate
146 return URIRef(f'http://bigasterisk.com/rfidCard/{justHex}')
147
148 def remap(self, parser, valueStr: str):
149 g = self.config
150 value = Literal(valueStr)
151 for entry in g.objects(parser, ROOM['map']):
152 if value == g.value(entry, ROOM['from']):
153 return g.value(entry, ROOM['to'])
154 raise KeyError(value)
155
156
157 if __name__ == '__main__':
158 arg = docopt("""
159 Usage: rdf_from_mqtt.py [options]
160
161 -v Verbose
162 --cs=STR Only process config filenames with this substring
163 """)
164 verboseLogging(arg['-v'])
165
166 config = Graph()
167 for fn in [
168 "config_cardreader.n3",
169 "config_nightlight_ari.n3",
170 "config_bed_bar.n3",
171 "config_air_quality_indoor.n3",
172 "config_air_quality_outdoor.n3",
173 "config_living_lamps.n3",
174 "config_kitchen.n3",
175 ]:
176 if not arg['--cs'] or arg['--cs'] in fn:
177 config.parse(fn, format='n3')
178
179 masterGraph = PatchableGraph()
180
181 mqtt = MqttClient(clientId='rdf_from_mqtt', brokerHost='bang',
182 brokerPort=1883)
183 influx = InfluxExporter(config)
184
185 srcs = []
186 for src in config.subjects(RDF.type, ROOM['MqttStatementSource']):
187 srcs.append(MqttStatementSource(src, config, masterGraph, mqtt, influx))
188 log.info(f'set up {len(srcs)} sources')
189
190 port = 10018
191 reactor.listenTCP(port, cyclone.web.Application([
192 (r"/()", cyclone.web.StaticFileHandler,
193 {"path": ".", "default_filename": "index.html"}),
194 (r'/stats/(.*)', StatsHandler, {'serverName': 'rdf_from_mqtt'}),
195 (r"/graph/mqtt", CycloneGraphHandler, {'masterGraph': masterGraph}),
196 (r"/graph/mqtt/events", CycloneGraphEventsHandler,
197 {'masterGraph': masterGraph}),
198 ], mqtt=mqtt, masterGraph=masterGraph, debug=arg['-v']),
199 interface='::')
200 log.warn('serving on %s', port)
201
202 reactor.run()