Mercurial > code > home > repos > homeauto
comparison service/rdf_to_mqtt/rdf_to_mqtt.py @ 1732:3f4b447d65f5
port to starlette/asyncio
author | drewp@bigasterisk.com |
---|---|
date | Mon, 10 Jul 2023 17:37:58 -0700 |
parents | 80b01d548b9c |
children | 09df2b4b886f |
comparison
equal
deleted
inserted
replaced
1731:35abc7656f0f | 1732:3f4b447d65f5 |
---|---|
2 We get output statements that are like light9's deviceAttrs (:dev1 :color "#ff0000"), | 2 We get output statements that are like light9's deviceAttrs (:dev1 :color "#ff0000"), |
3 convert those to outputAttrs (:dev1 :red 255; :green 0; :blue 0) and post them to mqtt. | 3 convert those to outputAttrs (:dev1 :red 255; :green 0; :blue 0) and post them to mqtt. |
4 | 4 |
5 This is like light9/bin/collector. | 5 This is like light9/bin/collector. |
6 """ | 6 """ |
7 import asyncio | |
7 import json | 8 import json |
9 import os | |
10 import time | |
8 | 11 |
9 import cyclone.web | 12 from prometheus_client import Counter, Gauge, Summary |
10 from cycloneerr import PrettyErrorHandler | |
11 from docopt import docopt | |
12 from greplin import scales | |
13 from greplin.scales.cyclonehandler import StatsHandler | |
14 from mqtt_client import MqttClient | |
15 from rdflib import Namespace | 13 from rdflib import Namespace |
16 from standardservice.logsetup import log, verboseLogging | 14 from starlette_exporter import PrometheusMiddleware, handle_metrics |
17 from twisted.internet import reactor | 15 from starlette.applications import Starlette |
16 from starlette.requests import Request | |
17 from starlette.responses import PlainTextResponse | |
18 from starlette.routing import Route | |
19 from starlette.staticfiles import StaticFiles | |
20 import aiomqtt | |
21 | |
18 from devs import devs | 22 from devs import devs |
19 import rdf_over_http | 23 import rdf_over_http |
20 | 24 |
25 # from victorialogger import log | |
26 import logging | |
27 | |
28 logging.basicConfig(level=logging.DEBUG) | |
29 log = logging.getLogger(__name__) | |
30 | |
21 ROOM = Namespace('http://projects.bigasterisk.com/room/') | 31 ROOM = Namespace('http://projects.bigasterisk.com/room/') |
22 | 32 |
23 STATS = scales.collection( | 33 PUT_REQUESTS = Summary('put_requests', 'calls') |
24 '/root', | 34 STATEMENT = Summary('on_statement', 'calls') |
25 scales.PmfStat('putRequests'), | 35 MQTT_PUBLISH = Summary('mqtt_publish', 'calls') |
26 scales.PmfStat('statement'), | 36 |
27 scales.PmfStat('mqttPublish'), | 37 mqtt: aiomqtt.Client | None = None |
28 ) | |
29 | 38 |
30 | 39 |
31 class OutputPage(PrettyErrorHandler, cyclone.web.RequestHandler): | 40 class OutputPage: |
32 | 41 |
33 @STATS.putRequests.time() | 42 async def put(self, request: Request) -> PlainTextResponse: |
34 def put(self): | 43 with PUT_REQUESTS.time(): |
35 for stmt in rdf_over_http.rdfStatementsFromRequest(self.request.arguments, self.request.body, self.request.headers): | 44 for stmt in rdf_over_http.rdfStatementsFromRequest(request.query_params, await request.body(), request.headers): |
36 self._onStatement(stmt) | 45 await self._onStatement(stmt) |
46 return PlainTextResponse("ok") | |
37 | 47 |
38 @STATS.statement.time() | 48 @STATEMENT.time() |
39 def _onStatement(self, stmt): | 49 async def _onStatement(self, stmt): |
40 log.info(f'incoming statement: {stmt}') | 50 log.info(f'incoming statement: {stmt}') |
41 ignored = True | 51 ignored = True |
42 for dev, attrs in devs.items(): | 52 for dev, attrs in devs.items(): |
43 if stmt[0] == ROOM['frontWindow']: | 53 if stmt[0] == ROOM['frontWindow']: |
44 ignored = ignored and self._publishFrontScreenText(stmt) | 54 ignored = ignored and self._publishFrontScreenText(stmt) |
45 if stmt[0:2] == (dev, ROOM['brightness']): | 55 if stmt[0:2] == (dev, ROOM['brightness']): |
46 log.info(f'brightness request: {stmt}') | 56 log.info(f'brightness request: {stmt}') |
47 brightness = stmt[2].toPython() | 57 brightness = stmt[2].toPython() |
48 | 58 |
49 if attrs.get('values', '') == 'binary': | 59 if attrs.get('values', '') == 'binary': |
50 self._publishOnOff(attrs, brightness) | 60 await self._publishOnOff(attrs, brightness) |
51 else: | 61 else: |
52 self._publishRgbw(attrs, brightness) | 62 await self._publishRgbw(attrs, brightness) |
53 ignored = False | 63 ignored = False |
54 if stmt[0:2] == (dev, ROOM['inputSelector']): | 64 if stmt[0:2] == (dev, ROOM['inputSelector']): |
55 choice = stmt[2].toPython().decode('utf8') | 65 choice = stmt[2].toPython() |
56 self._publish(topic=attrs['root'], message=f'input_{choice}') | 66 await self._publish(topic=attrs['root'], message=f'input_{choice}') |
57 ignored = False | 67 ignored = False |
58 if stmt[0:2] == (dev, ROOM['volumeChange']): | 68 if stmt[0:2] == (dev, ROOM['volumeChange']): |
59 delta = int(stmt[2].toPython()) | 69 delta = int(stmt[2].toPython()) |
60 which = 'up' if delta > 0 else 'down' | 70 which = 'up' if delta > 0 else 'down' |
61 self._publish(topic=f'theater_blaster/ir_out/volume_{which}', message=json.dumps({'timed': abs(delta)})) | 71 await self._publish(topic=f'theater_blaster/ir_out/volume_{which}', message=json.dumps({'timed': abs(delta)})) |
62 ignored = False | 72 ignored = False |
63 if stmt[0:2] == (dev, ROOM['color']): | 73 if stmt[0:2] == (dev, ROOM['color']): |
64 h = stmt[2].toPython() | 74 msg = self._onColor(stmt[2].toPython(), attrs) |
65 msg = {} | 75 await self._publish(topic=attrs['root'], message=json.dumps(msg)) |
66 if h.endswith(b'K'): # accept "0.7*2200K" (brightness 0.7) | |
67 # see https://www.zigbee2mqtt.io/information/mqtt_topics_and_message_structure.html#zigbee2mqttfriendly_nameset | |
68 bright, kelvin = map(float, h[:-1].split(b'*')) | |
69 msg['state'] = 'ON' | |
70 msg["color_temp"] = round(1000000 / kelvin, 2) | |
71 msg['brightness'] = int(bright * 255) # 1..20 look about the same | |
72 else: | |
73 r, g, b = int(h[1:3], 16), int(h[3:5], 16), int(h[5:7], 16) | |
74 msg = { | |
75 'state': 'ON' if r or g or b else 'OFF', | |
76 'color': { | |
77 'r': r, | |
78 'g': g, | |
79 'b': b | |
80 }, | |
81 'brightness': max(r, g, b), | |
82 } | |
83 | |
84 if attrs.get('hasWhite', False): | |
85 msg['white_value'] = max(r, g, b) | |
86 msg.update(attrs.get('defaults', {})) | |
87 self._publish(topic=attrs['root'], message=json.dumps(msg)) | |
88 ignored = False | 76 ignored = False |
89 | 77 |
90 if ignored: | 78 if ignored: |
91 log.warn("ignoring %s", stmt) | 79 log.warn("ignoring %s", stmt) |
92 | 80 |
93 def _publishOnOff(self, attrs, brightness): | 81 def _onColor(self, h, attrs): |
82 if isinstance(h, bytes): | |
83 h = h.decode('utf8') | |
84 msg = {} | |
85 if h.endswith('K'): # accept "0.7*2200K" (brightness 0.7) | |
86 # see https://www.zigbee2mqtt.io/information/mqtt_topics_and_message_structure.html#zigbee2mqttfriendly_nameset | |
87 bright, kelvin = map(float, h[:-1].split('*')) | |
88 msg['state'] = 'ON' | |
89 msg["color_temp"] = round(1000000 / kelvin, 2) | |
90 msg['brightness'] = int(bright * 255) # 1..20 look about the same | |
91 else: | |
92 r, g, b = int(h[1:3], 16), int(h[3:5], 16), int(h[5:7], 16) | |
93 msg = { | |
94 'state': 'ON' if r or g or b else 'OFF', | |
95 'color': { | |
96 'r': r, | |
97 'g': g, | |
98 'b': b | |
99 }, | |
100 'brightness': max(r, g, b), | |
101 } | |
102 | |
103 if attrs.get('hasWhite', False): | |
104 msg['white_value'] = max(r, g, b) | |
105 msg.update(attrs.get('defaults', {})) | |
106 return msg | |
107 | |
108 async def _publishOnOff(self, attrs, brightness): | |
94 msg = 'OFF' | 109 msg = 'OFF' |
95 if brightness > 0: | 110 if brightness > 0: |
96 msg = 'ON' | 111 msg = 'ON' |
97 self._publish(topic=attrs['root'], message=msg) | 112 await self._publish(topic=attrs['root'], message=msg) |
98 | 113 |
99 def _publishRgbw(self, attrs, brightness): | 114 async def _publishRgbw(self, attrs, brightness): |
100 for chan, scale in [('w1', 1), ('r', 1), ('g', .8), ('b', .8)]: | 115 for chan, scale in [('w1', 1), ('r', 1), ('g', .8), ('b', .8)]: |
101 self._publish(topic=f"{attrs['root']}/light/kit_{chan}/command", messageJson={'state': 'ON', 'brightness': int(brightness * 255)}) | 116 await self._publish(topic=f"{attrs['root']}/light/kit_{chan}/command", messageJson={'state': 'ON', 'brightness': int(brightness * 255)}) |
102 | 117 |
103 def _publishFrontScreenText(self, stmt): | 118 async def _publishFrontScreenText(self, stmt): |
104 ignored = True | 119 ignored = True |
105 for line in ['line1', 'line2', 'line3', 'line4']: | 120 for line in ['line1', 'line2', 'line3', 'line4']: |
106 if stmt[1] == ROOM[line]: | 121 if stmt[1] == ROOM[line]: |
107 ignored = False | 122 ignored = False |
108 self.settings.mqtt.publish(b'frontwindow/%s' % line.encode('ascii'), stmt[2].toPython()) | 123 assert mqtt is not None |
124 await mqtt.publish('frontwindow/%s' % line, stmt[2].toPython()) | |
109 return ignored | 125 return ignored |
110 | 126 |
111 @STATS.mqttPublish.time() | 127 @MQTT_PUBLISH.time() |
112 def _publish(self, topic: str, messageJson: object = None, message: str = None): | 128 async def _publish(self, topic: str, messageJson: object = None, message: str | None = None): |
113 log.debug(f'mqtt.publish {topic} {message} {messageJson}') | 129 log.debug(f'mqtt.publish {topic} {message} {messageJson}') |
114 if messageJson is not None: | 130 if messageJson is not None: |
115 message = json.dumps(messageJson) | 131 message = json.dumps(messageJson) |
116 self.settings.mqtt.publish(topic.encode('ascii'), message.encode('ascii')) | 132 assert mqtt is not None |
133 await mqtt.publish(topic, message) | |
117 | 134 |
118 | 135 |
119 if __name__ == '__main__': | 136 def main(): |
120 arg = docopt(""" | |
121 Usage: rdf_to_mqtt.py [options] | |
122 | 137 |
123 -v Verbose | 138 async def start2(): |
124 """) | 139 global mqtt |
125 verboseLogging(arg['-v']) | 140 async with aiomqtt.Client(os.environ.get('MOSQUITTO', "mosquitto-ext"), 1883, client_id="rdf_to_mqtt-%s" % time.time(), keepalive=6) as mqtt: |
141 log.info(f'connected to mqtt {mqtt}') | |
142 while True: | |
143 await asyncio.sleep(5) | |
126 | 144 |
127 mqtt = MqttClient(clientId='rdf_to_mqtt', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) | 145 def start(): |
146 asyncio.create_task(start2()) | |
128 | 147 |
129 port = 10008 | 148 log.info('make app') |
130 reactor.listenTCP(port, | 149 app = Starlette(debug=True, |
131 cyclone.web.Application([ | 150 on_startup=[start], |
132 (r"/()", cyclone.web.StaticFileHandler, { | 151 routes=[ |
133 "path": ".", | 152 Route('/', StaticFiles(directory='.', html=True)), |
134 "default_filename": "index.html" | 153 Route("/output", OutputPage().put, methods=["PUT"]), |
135 }), | 154 ]) |
136 (r'/output', OutputPage), | 155 app.add_middleware(PrometheusMiddleware, app_name='environment') |
137 (r'/stats/(.*)', StatsHandler, { | 156 app.add_route("/metrics", handle_metrics) |
138 'serverName': 'rdf_to_mqtt' | 157 log.info('return app') |
139 }), | 158 return app |
140 ], | |
141 mqtt=mqtt, | |
142 debug=arg['-v']), | |
143 interface='::') | |
144 log.warn('serving on %s', port) | |
145 | 159 |
146 reactor.run() | 160 |
161 app = main() |