comparison service/rdf_to_mqtt/rdf_to_mqtt.py @ 1732:3f4b447d65f5

port to starlette/asyncio
author drewp@bigasterisk.com
date Mon, 10 Jul 2023 17:37:58 -0700
parents 80b01d548b9c
children 09df2b4b886f
comparison
equal deleted inserted replaced
1731:35abc7656f0f 1732:3f4b447d65f5
2 We get output statements that are like light9's deviceAttrs (:dev1 :color "#ff0000"), 2 We get output statements that are like light9's deviceAttrs (:dev1 :color "#ff0000"),
3 convert those to outputAttrs (:dev1 :red 255; :green 0; :blue 0) and post them to mqtt. 3 convert those to outputAttrs (:dev1 :red 255; :green 0; :blue 0) and post them to mqtt.
4 4
5 This is like light9/bin/collector. 5 This is like light9/bin/collector.
6 """ 6 """
7 import asyncio
7 import json 8 import json
9 import os
10 import time
8 11
9 import cyclone.web 12 from prometheus_client import Counter, Gauge, Summary
10 from cycloneerr import PrettyErrorHandler
11 from docopt import docopt
12 from greplin import scales
13 from greplin.scales.cyclonehandler import StatsHandler
14 from mqtt_client import MqttClient
15 from rdflib import Namespace 13 from rdflib import Namespace
16 from standardservice.logsetup import log, verboseLogging 14 from starlette_exporter import PrometheusMiddleware, handle_metrics
17 from twisted.internet import reactor 15 from starlette.applications import Starlette
16 from starlette.requests import Request
17 from starlette.responses import PlainTextResponse
18 from starlette.routing import Route
19 from starlette.staticfiles import StaticFiles
20 import aiomqtt
21
18 from devs import devs 22 from devs import devs
19 import rdf_over_http 23 import rdf_over_http
20 24
25 # from victorialogger import log
26 import logging
27
28 logging.basicConfig(level=logging.DEBUG)
29 log = logging.getLogger(__name__)
30
21 ROOM = Namespace('http://projects.bigasterisk.com/room/') 31 ROOM = Namespace('http://projects.bigasterisk.com/room/')
22 32
23 STATS = scales.collection( 33 PUT_REQUESTS = Summary('put_requests', 'calls')
24 '/root', 34 STATEMENT = Summary('on_statement', 'calls')
25 scales.PmfStat('putRequests'), 35 MQTT_PUBLISH = Summary('mqtt_publish', 'calls')
26 scales.PmfStat('statement'), 36
27 scales.PmfStat('mqttPublish'), 37 mqtt: aiomqtt.Client | None = None
28 )
29 38
30 39
31 class OutputPage(PrettyErrorHandler, cyclone.web.RequestHandler): 40 class OutputPage:
32 41
33 @STATS.putRequests.time() 42 async def put(self, request: Request) -> PlainTextResponse:
34 def put(self): 43 with PUT_REQUESTS.time():
35 for stmt in rdf_over_http.rdfStatementsFromRequest(self.request.arguments, self.request.body, self.request.headers): 44 for stmt in rdf_over_http.rdfStatementsFromRequest(request.query_params, await request.body(), request.headers):
36 self._onStatement(stmt) 45 await self._onStatement(stmt)
46 return PlainTextResponse("ok")
37 47
38 @STATS.statement.time() 48 @STATEMENT.time()
39 def _onStatement(self, stmt): 49 async def _onStatement(self, stmt):
40 log.info(f'incoming statement: {stmt}') 50 log.info(f'incoming statement: {stmt}')
41 ignored = True 51 ignored = True
42 for dev, attrs in devs.items(): 52 for dev, attrs in devs.items():
43 if stmt[0] == ROOM['frontWindow']: 53 if stmt[0] == ROOM['frontWindow']:
44 ignored = ignored and self._publishFrontScreenText(stmt) 54 ignored = ignored and self._publishFrontScreenText(stmt)
45 if stmt[0:2] == (dev, ROOM['brightness']): 55 if stmt[0:2] == (dev, ROOM['brightness']):
46 log.info(f'brightness request: {stmt}') 56 log.info(f'brightness request: {stmt}')
47 brightness = stmt[2].toPython() 57 brightness = stmt[2].toPython()
48 58
49 if attrs.get('values', '') == 'binary': 59 if attrs.get('values', '') == 'binary':
50 self._publishOnOff(attrs, brightness) 60 await self._publishOnOff(attrs, brightness)
51 else: 61 else:
52 self._publishRgbw(attrs, brightness) 62 await self._publishRgbw(attrs, brightness)
53 ignored = False 63 ignored = False
54 if stmt[0:2] == (dev, ROOM['inputSelector']): 64 if stmt[0:2] == (dev, ROOM['inputSelector']):
55 choice = stmt[2].toPython().decode('utf8') 65 choice = stmt[2].toPython()
56 self._publish(topic=attrs['root'], message=f'input_{choice}') 66 await self._publish(topic=attrs['root'], message=f'input_{choice}')
57 ignored = False 67 ignored = False
58 if stmt[0:2] == (dev, ROOM['volumeChange']): 68 if stmt[0:2] == (dev, ROOM['volumeChange']):
59 delta = int(stmt[2].toPython()) 69 delta = int(stmt[2].toPython())
60 which = 'up' if delta > 0 else 'down' 70 which = 'up' if delta > 0 else 'down'
61 self._publish(topic=f'theater_blaster/ir_out/volume_{which}', message=json.dumps({'timed': abs(delta)})) 71 await self._publish(topic=f'theater_blaster/ir_out/volume_{which}', message=json.dumps({'timed': abs(delta)}))
62 ignored = False 72 ignored = False
63 if stmt[0:2] == (dev, ROOM['color']): 73 if stmt[0:2] == (dev, ROOM['color']):
64 h = stmt[2].toPython() 74 msg = self._onColor(stmt[2].toPython(), attrs)
65 msg = {} 75 await self._publish(topic=attrs['root'], message=json.dumps(msg))
66 if h.endswith(b'K'): # accept "0.7*2200K" (brightness 0.7)
67 # see https://www.zigbee2mqtt.io/information/mqtt_topics_and_message_structure.html#zigbee2mqttfriendly_nameset
68 bright, kelvin = map(float, h[:-1].split(b'*'))
69 msg['state'] = 'ON'
70 msg["color_temp"] = round(1000000 / kelvin, 2)
71 msg['brightness'] = int(bright * 255) # 1..20 look about the same
72 else:
73 r, g, b = int(h[1:3], 16), int(h[3:5], 16), int(h[5:7], 16)
74 msg = {
75 'state': 'ON' if r or g or b else 'OFF',
76 'color': {
77 'r': r,
78 'g': g,
79 'b': b
80 },
81 'brightness': max(r, g, b),
82 }
83
84 if attrs.get('hasWhite', False):
85 msg['white_value'] = max(r, g, b)
86 msg.update(attrs.get('defaults', {}))
87 self._publish(topic=attrs['root'], message=json.dumps(msg))
88 ignored = False 76 ignored = False
89 77
90 if ignored: 78 if ignored:
91 log.warn("ignoring %s", stmt) 79 log.warn("ignoring %s", stmt)
92 80
93 def _publishOnOff(self, attrs, brightness): 81 def _onColor(self, h, attrs):
82 if isinstance(h, bytes):
83 h = h.decode('utf8')
84 msg = {}
85 if h.endswith('K'): # accept "0.7*2200K" (brightness 0.7)
86 # see https://www.zigbee2mqtt.io/information/mqtt_topics_and_message_structure.html#zigbee2mqttfriendly_nameset
87 bright, kelvin = map(float, h[:-1].split('*'))
88 msg['state'] = 'ON'
89 msg["color_temp"] = round(1000000 / kelvin, 2)
90 msg['brightness'] = int(bright * 255) # 1..20 look about the same
91 else:
92 r, g, b = int(h[1:3], 16), int(h[3:5], 16), int(h[5:7], 16)
93 msg = {
94 'state': 'ON' if r or g or b else 'OFF',
95 'color': {
96 'r': r,
97 'g': g,
98 'b': b
99 },
100 'brightness': max(r, g, b),
101 }
102
103 if attrs.get('hasWhite', False):
104 msg['white_value'] = max(r, g, b)
105 msg.update(attrs.get('defaults', {}))
106 return msg
107
108 async def _publishOnOff(self, attrs, brightness):
94 msg = 'OFF' 109 msg = 'OFF'
95 if brightness > 0: 110 if brightness > 0:
96 msg = 'ON' 111 msg = 'ON'
97 self._publish(topic=attrs['root'], message=msg) 112 await self._publish(topic=attrs['root'], message=msg)
98 113
99 def _publishRgbw(self, attrs, brightness): 114 async def _publishRgbw(self, attrs, brightness):
100 for chan, scale in [('w1', 1), ('r', 1), ('g', .8), ('b', .8)]: 115 for chan, scale in [('w1', 1), ('r', 1), ('g', .8), ('b', .8)]:
101 self._publish(topic=f"{attrs['root']}/light/kit_{chan}/command", messageJson={'state': 'ON', 'brightness': int(brightness * 255)}) 116 await self._publish(topic=f"{attrs['root']}/light/kit_{chan}/command", messageJson={'state': 'ON', 'brightness': int(brightness * 255)})
102 117
103 def _publishFrontScreenText(self, stmt): 118 async def _publishFrontScreenText(self, stmt):
104 ignored = True 119 ignored = True
105 for line in ['line1', 'line2', 'line3', 'line4']: 120 for line in ['line1', 'line2', 'line3', 'line4']:
106 if stmt[1] == ROOM[line]: 121 if stmt[1] == ROOM[line]:
107 ignored = False 122 ignored = False
108 self.settings.mqtt.publish(b'frontwindow/%s' % line.encode('ascii'), stmt[2].toPython()) 123 assert mqtt is not None
124 await mqtt.publish('frontwindow/%s' % line, stmt[2].toPython())
109 return ignored 125 return ignored
110 126
111 @STATS.mqttPublish.time() 127 @MQTT_PUBLISH.time()
112 def _publish(self, topic: str, messageJson: object = None, message: str = None): 128 async def _publish(self, topic: str, messageJson: object = None, message: str | None = None):
113 log.debug(f'mqtt.publish {topic} {message} {messageJson}') 129 log.debug(f'mqtt.publish {topic} {message} {messageJson}')
114 if messageJson is not None: 130 if messageJson is not None:
115 message = json.dumps(messageJson) 131 message = json.dumps(messageJson)
116 self.settings.mqtt.publish(topic.encode('ascii'), message.encode('ascii')) 132 assert mqtt is not None
133 await mqtt.publish(topic, message)
117 134
118 135
119 if __name__ == '__main__': 136 def main():
120 arg = docopt("""
121 Usage: rdf_to_mqtt.py [options]
122 137
123 -v Verbose 138 async def start2():
124 """) 139 global mqtt
125 verboseLogging(arg['-v']) 140 async with aiomqtt.Client(os.environ.get('MOSQUITTO', "mosquitto-ext"), 1883, client_id="rdf_to_mqtt-%s" % time.time(), keepalive=6) as mqtt:
141 log.info(f'connected to mqtt {mqtt}')
142 while True:
143 await asyncio.sleep(5)
126 144
127 mqtt = MqttClient(clientId='rdf_to_mqtt', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) 145 def start():
146 asyncio.create_task(start2())
128 147
129 port = 10008 148 log.info('make app')
130 reactor.listenTCP(port, 149 app = Starlette(debug=True,
131 cyclone.web.Application([ 150 on_startup=[start],
132 (r"/()", cyclone.web.StaticFileHandler, { 151 routes=[
133 "path": ".", 152 Route('/', StaticFiles(directory='.', html=True)),
134 "default_filename": "index.html" 153 Route("/output", OutputPage().put, methods=["PUT"]),
135 }), 154 ])
136 (r'/output', OutputPage), 155 app.add_middleware(PrometheusMiddleware, app_name='environment')
137 (r'/stats/(.*)', StatsHandler, { 156 app.add_route("/metrics", handle_metrics)
138 'serverName': 'rdf_to_mqtt' 157 log.info('return app')
139 }), 158 return app
140 ],
141 mqtt=mqtt,
142 debug=arg['-v']),
143 interface='::')
144 log.warn('serving on %s', port)
145 159
146 reactor.run() 160
161 app = main()