comparison service/rdf_to_mqtt/rdf_to_mqtt.py @ 777:df7035db28f1

reformat
author drewp@bigasterisk.com
date Sat, 08 Aug 2020 13:14:49 -0700
parents 8fa420250799
children acf58b83022f
comparison
equal deleted inserted replaced
776:8fa420250799 777:df7035db28f1
3 convert those to outputAttrs (:dev1 :red 255; :green 0; :blue 0) and post them to mqtt. 3 convert those to outputAttrs (:dev1 :red 255; :green 0; :blue 0) and post them to mqtt.
4 4
5 This is like light9/bin/collector. 5 This is like light9/bin/collector.
6 """ 6 """
7 import json 7 import json
8 from mqtt_client import MqttClient 8
9 import cyclone.web
10 from cycloneerr import PrettyErrorHandler
9 from docopt import docopt 11 from docopt import docopt
10 from rdflib import Namespace
11 from twisted.internet import reactor
12 import cyclone.web
13 from greplin import scales 12 from greplin import scales
14 from greplin.scales.cyclonehandler import StatsHandler 13 from greplin.scales.cyclonehandler import StatsHandler
14 from mqtt_client import MqttClient
15 from rdflib import Namespace
16 from standardservice.logsetup import log, verboseLogging
17 from twisted.internet import reactor
15 18
16 from standardservice.logsetup import log, verboseLogging
17 import rdf_over_http 19 import rdf_over_http
18 from cycloneerr import PrettyErrorHandler
19 20
20 ROOM = Namespace('http://projects.bigasterisk.com/room/') 21 ROOM = Namespace('http://projects.bigasterisk.com/room/')
21 22
22 STATS = scales.collection('/root', 23 STATS = scales.collection(
23 scales.PmfStat('putRequests'), 24 '/root',
24 scales.PmfStat('statement'), 25 scales.PmfStat('putRequests'),
25 scales.PmfStat('mqttPublish'), 26 scales.PmfStat('statement'),
27 scales.PmfStat('mqttPublish'),
26 ) 28 )
27 29
28 devs = { 30 devs = {
29 ROOM['kitchenLight']: { 31 ROOM['kitchenLight']: {
30 'root': 'h801_skylight', 32 'root': 'h801_skylight',
57 'values': 'theaterOutputs', 59 'values': 'theaterOutputs',
58 }, 60 },
59 ROOM['bedHeadboard']: { 61 ROOM['bedHeadboard']: {
60 'root': 'bed/light/headboard/command', 62 'root': 'bed/light/headboard/command',
61 }, 63 },
62 #-t theater_blaster/ir_out -m 'input_game' 64 #-t theater_blaster/ir_out -m 'input_game'
63 #-t theater_blaster/ir_out -m 'input_bd' 65 #-t theater_blaster/ir_out -m 'input_bd'
64 #-t theater_blaster/ir_out -m 'input_cbl' 66 #-t theater_blaster/ir_out -m 'input_cbl'
65 #-t theater_blaster/ir_out -m 'input_pc' 67 #-t theater_blaster/ir_out -m 'input_pc'
66 #-t theater_blaster/ir_out/volume_up -m '{"times":1}' 68 #-t theater_blaster/ir_out/volume_up -m '{"times":1}'
67 #-t theater_blaster/ir_out/volume_down -m '{"times":1}' 69 #-t theater_blaster/ir_out/volume_down -m '{"times":1}'
68 } 70 }
69 71
70 72
71 class OutputPage(PrettyErrorHandler, cyclone.web.RequestHandler): 73 class OutputPage(PrettyErrorHandler, cyclone.web.RequestHandler):
72 @STATS.putRequests.time() 74 @STATS.putRequests.time()
73 def put(self): 75 def put(self):
74 for stmt in rdf_over_http.rdfStatementsFromRequest( 76 for stmt in rdf_over_http.rdfStatementsFromRequest(
75 self.request.arguments, 77 self.request.arguments, self.request.body,
76 self.request.body,
77 self.request.headers): 78 self.request.headers):
78 self._onStatement(stmt) 79 self._onStatement(stmt)
79 80
80 @STATS.statement.time() 81 @STATS.statement.time()
81 def _onStatement(self, stmt): 82 def _onStatement(self, stmt):
103 self._publish(topic=f'theater_blaster/ir_out/volume_{which}', 104 self._publish(topic=f'theater_blaster/ir_out/volume_{which}',
104 message=json.dumps({'timed': abs(delta)})) 105 message=json.dumps({'timed': abs(delta)}))
105 ignored = False 106 ignored = False
106 if stmt[0:2] == (dev, ROOM['color']): 107 if stmt[0:2] == (dev, ROOM['color']):
107 h = stmt[2].toPython() 108 h = stmt[2].toPython()
108 r,g,b = int(h[1:3], 16), int(h[3:5], 16), int(h[5:7], 16) 109 r, g, b = int(h[1:3], 16), int(h[3:5], 16), int(h[5:7], 16)
109 self._publish(topic=attrs['root'], 110 self._publish(topic=attrs['root'],
110 message=json.dumps({'state': 'ON' if r or g or b else 'OFF', 111 message=json.dumps({
111 'color': {'r': r, 'g': g, 'b': b}, 112 'state':
112 'white_value': max(r, g, b)})) 113 'ON' if r or g or b else 'OFF',
113 ignored = false 114 'color': {
115 'r': r,
116 'g': g,
117 'b': b
118 },
119 'white_value':
120 max(r, g, b)
121 }))
122 ignored = False
114 if ignored: 123 if ignored:
115 log.warn("ignoring %s", stmt) 124 log.warn("ignoring %s", stmt)
116 125
117 def _publishOnOff(self, attrs, brightness): 126 def _publishOnOff(self, attrs, brightness):
118 msg = 'OFF' 127 msg = 'OFF'
119 if brightness > 0: 128 if brightness > 0:
120 msg = 'ON' 129 msg = 'ON'
121 self._publish(topic=attrs['root'], message=msg) 130 self._publish(topic=attrs['root'], message=msg)
122 131
123 def _publishRgbw(self, attrs, brightness): 132 def _publishRgbw(self, attrs, brightness):
124 for chan, scale in [('w1', 1), 133 for chan, scale in [('w1', 1), ('r', 1), ('g', .8), ('b', .8)]:
125 ('r', 1), 134 self._publish(topic=f"{attrs['root']}/light/kit_{chan}/command",
126 ('g', .8), 135 messageJson={
127 ('b', .8)]: 136 'state': 'ON',
128 self._publish( 137 'brightness': int(brightness * 255)
129 topic=f"{attrs['root']}/light/kit_{chan}/command", 138 })
130 messageJson={
131 'state': 'ON',
132 'brightness': int(brightness * 255)
133 })
134 139
135 def _publishFrontScreenText(self, stmt): 140 def _publishFrontScreenText(self, stmt):
136 ignored = True 141 ignored = True
137 for line in ['line1', 'line2', 'line3', 'line4']: 142 for line in ['line1', 'line2', 'line3', 'line4']:
138 if stmt[1] == ROOM[line]: 143 if stmt[1] == ROOM[line]:
141 b'frontwindow/%s' % line.encode('ascii'), 146 b'frontwindow/%s' % line.encode('ascii'),
142 stmt[2].toPython()) 147 stmt[2].toPython())
143 return ignored 148 return ignored
144 149
145 @STATS.mqttPublish.time() 150 @STATS.mqttPublish.time()
146 def _publish(self, topic: str, messageJson: object=None, 151 def _publish(self,
147 message: str=None): 152 topic: str,
153 messageJson: object = None,
154 message: str = None):
148 log.debug(f'mqtt.publish {topic} {message} {messageJson}') 155 log.debug(f'mqtt.publish {topic} {message} {messageJson}')
149 if messageJson is not None: 156 if messageJson is not None:
150 message = json.dumps(messageJson) 157 message = json.dumps(messageJson)
151 self.settings.mqtt.publish( 158 self.settings.mqtt.publish(topic.encode('ascii'),
152 topic.encode('ascii'), 159 message.encode('ascii'))
153 message.encode('ascii'))
154 160
155 161
156 if __name__ == '__main__': 162 if __name__ == '__main__':
157 arg = docopt(""" 163 arg = docopt("""
158 Usage: rdf_to_mqtt.py [options] 164 Usage: rdf_to_mqtt.py [options]
159 165
160 -v Verbose 166 -v Verbose
161 """) 167 """)
162 verboseLogging(arg['-v']) 168 verboseLogging(arg['-v'])
163 169
164 mqtt = MqttClient(clientId='rdf_to_mqtt', brokerPort=1883) 170 mqtt = MqttClient(clientId='rdf_to_mqtt',
171 brokerHost='mosquitto-ext.default.svc.cluster.local',
172 brokerPort=1883)
165 173
166 port = 10008 174 port = 10008
167 reactor.listenTCP(port, cyclone.web.Application([ 175 reactor.listenTCP(port,
168 (r"/()", cyclone.web.StaticFileHandler, 176 cyclone.web.Application([
169 {"path": ".", "default_filename": "index.html"}), 177 (r"/()", cyclone.web.StaticFileHandler, {
170 (r'/output', OutputPage), 178 "path": ".",
171 (r'/stats/(.*)', StatsHandler, {'serverName': 'rdf_to_mqtt'}), 179 "default_filename": "index.html"
172 ], mqtt=mqtt, debug=arg['-v']), interface='::') 180 }),
181 (r'/output', OutputPage),
182 (r'/stats/(.*)', StatsHandler, {
183 'serverName': 'rdf_to_mqtt'
184 }),
185 ],
186 mqtt=mqtt,
187 debug=arg['-v']),
188 interface='::')
173 log.warn('serving on %s', port) 189 log.warn('serving on %s', port)
174 190
175 reactor.run() 191 reactor.run()