Mercurial > code > home > repos > homeauto
diff service/rdf_to_mqtt/rdf_to_mqtt.py @ 1732:3f4b447d65f5
port to starlette/asyncio
author | drewp@bigasterisk.com |
---|---|
date | Mon, 10 Jul 2023 17:37:58 -0700 |
parents | 80b01d548b9c |
children | 09df2b4b886f |
line wrap: on
line diff
--- a/service/rdf_to_mqtt/rdf_to_mqtt.py Fri Jun 30 22:11:06 2023 -0700 +++ b/service/rdf_to_mqtt/rdf_to_mqtt.py Mon Jul 10 17:37:58 2023 -0700 @@ -4,39 +4,49 @@ This is like light9/bin/collector. """ +import asyncio import json +import os +import time -import cyclone.web -from cycloneerr import PrettyErrorHandler -from docopt import docopt -from greplin import scales -from greplin.scales.cyclonehandler import StatsHandler -from mqtt_client import MqttClient +from prometheus_client import Counter, Gauge, Summary from rdflib import Namespace -from standardservice.logsetup import log, verboseLogging -from twisted.internet import reactor +from starlette_exporter import PrometheusMiddleware, handle_metrics +from starlette.applications import Starlette +from starlette.requests import Request +from starlette.responses import PlainTextResponse +from starlette.routing import Route +from starlette.staticfiles import StaticFiles +import aiomqtt + from devs import devs import rdf_over_http +# from victorialogger import log +import logging + +logging.basicConfig(level=logging.DEBUG) +log = logging.getLogger(__name__) + ROOM = Namespace('http://projects.bigasterisk.com/room/') -STATS = scales.collection( - '/root', - scales.PmfStat('putRequests'), - scales.PmfStat('statement'), - scales.PmfStat('mqttPublish'), -) +PUT_REQUESTS = Summary('put_requests', 'calls') +STATEMENT = Summary('on_statement', 'calls') +MQTT_PUBLISH = Summary('mqtt_publish', 'calls') + +mqtt: aiomqtt.Client | None = None -class OutputPage(PrettyErrorHandler, cyclone.web.RequestHandler): +class OutputPage: - @STATS.putRequests.time() - def put(self): - for stmt in rdf_over_http.rdfStatementsFromRequest(self.request.arguments, self.request.body, self.request.headers): - self._onStatement(stmt) + async def put(self, request: Request) -> PlainTextResponse: + with PUT_REQUESTS.time(): + for stmt in rdf_over_http.rdfStatementsFromRequest(request.query_params, await request.body(), request.headers): + await self._onStatement(stmt) + return PlainTextResponse("ok") - @STATS.statement.time() - def _onStatement(self, stmt): + @STATEMENT.time() + async def _onStatement(self, stmt): log.info(f'incoming statement: {stmt}') ignored = True for dev, attrs in devs.items(): @@ -47,100 +57,105 @@ brightness = stmt[2].toPython() if attrs.get('values', '') == 'binary': - self._publishOnOff(attrs, brightness) + await self._publishOnOff(attrs, brightness) else: - self._publishRgbw(attrs, brightness) + await self._publishRgbw(attrs, brightness) ignored = False if stmt[0:2] == (dev, ROOM['inputSelector']): - choice = stmt[2].toPython().decode('utf8') - self._publish(topic=attrs['root'], message=f'input_{choice}') + choice = stmt[2].toPython() + await self._publish(topic=attrs['root'], message=f'input_{choice}') ignored = False if stmt[0:2] == (dev, ROOM['volumeChange']): delta = int(stmt[2].toPython()) which = 'up' if delta > 0 else 'down' - self._publish(topic=f'theater_blaster/ir_out/volume_{which}', message=json.dumps({'timed': abs(delta)})) + await self._publish(topic=f'theater_blaster/ir_out/volume_{which}', message=json.dumps({'timed': abs(delta)})) ignored = False if stmt[0:2] == (dev, ROOM['color']): - h = stmt[2].toPython() - msg = {} - if h.endswith(b'K'): # accept "0.7*2200K" (brightness 0.7) - # see https://www.zigbee2mqtt.io/information/mqtt_topics_and_message_structure.html#zigbee2mqttfriendly_nameset - bright, kelvin = map(float, h[:-1].split(b'*')) - msg['state'] = 'ON' - msg["color_temp"] = round(1000000 / kelvin, 2) - msg['brightness'] = int(bright * 255) # 1..20 look about the same - else: - r, g, b = int(h[1:3], 16), int(h[3:5], 16), int(h[5:7], 16) - msg = { - 'state': 'ON' if r or g or b else 'OFF', - 'color': { - 'r': r, - 'g': g, - 'b': b - }, - 'brightness': max(r, g, b), - } - - if attrs.get('hasWhite', False): - msg['white_value'] = max(r, g, b) - msg.update(attrs.get('defaults', {})) - self._publish(topic=attrs['root'], message=json.dumps(msg)) + msg = self._onColor(stmt[2].toPython(), attrs) + await self._publish(topic=attrs['root'], message=json.dumps(msg)) ignored = False if ignored: log.warn("ignoring %s", stmt) - def _publishOnOff(self, attrs, brightness): + def _onColor(self, h, attrs): + if isinstance(h, bytes): + h = h.decode('utf8') + msg = {} + if h.endswith('K'): # accept "0.7*2200K" (brightness 0.7) + # see https://www.zigbee2mqtt.io/information/mqtt_topics_and_message_structure.html#zigbee2mqttfriendly_nameset + bright, kelvin = map(float, h[:-1].split('*')) + msg['state'] = 'ON' + msg["color_temp"] = round(1000000 / kelvin, 2) + msg['brightness'] = int(bright * 255) # 1..20 look about the same + else: + r, g, b = int(h[1:3], 16), int(h[3:5], 16), int(h[5:7], 16) + msg = { + 'state': 'ON' if r or g or b else 'OFF', + 'color': { + 'r': r, + 'g': g, + 'b': b + }, + 'brightness': max(r, g, b), + } + + if attrs.get('hasWhite', False): + msg['white_value'] = max(r, g, b) + msg.update(attrs.get('defaults', {})) + return msg + + async def _publishOnOff(self, attrs, brightness): msg = 'OFF' if brightness > 0: msg = 'ON' - self._publish(topic=attrs['root'], message=msg) + await self._publish(topic=attrs['root'], message=msg) - def _publishRgbw(self, attrs, brightness): + async def _publishRgbw(self, attrs, brightness): for chan, scale in [('w1', 1), ('r', 1), ('g', .8), ('b', .8)]: - self._publish(topic=f"{attrs['root']}/light/kit_{chan}/command", messageJson={'state': 'ON', 'brightness': int(brightness * 255)}) + await self._publish(topic=f"{attrs['root']}/light/kit_{chan}/command", messageJson={'state': 'ON', 'brightness': int(brightness * 255)}) - def _publishFrontScreenText(self, stmt): + async def _publishFrontScreenText(self, stmt): ignored = True for line in ['line1', 'line2', 'line3', 'line4']: if stmt[1] == ROOM[line]: ignored = False - self.settings.mqtt.publish(b'frontwindow/%s' % line.encode('ascii'), stmt[2].toPython()) + assert mqtt is not None + await mqtt.publish('frontwindow/%s' % line, stmt[2].toPython()) return ignored - @STATS.mqttPublish.time() - def _publish(self, topic: str, messageJson: object = None, message: str = None): + @MQTT_PUBLISH.time() + async def _publish(self, topic: str, messageJson: object = None, message: str | None = None): log.debug(f'mqtt.publish {topic} {message} {messageJson}') if messageJson is not None: message = json.dumps(messageJson) - self.settings.mqtt.publish(topic.encode('ascii'), message.encode('ascii')) + assert mqtt is not None + await mqtt.publish(topic, message) -if __name__ == '__main__': - arg = docopt(""" - Usage: rdf_to_mqtt.py [options] +def main(): - -v Verbose - """) - verboseLogging(arg['-v']) + async def start2(): + global mqtt + async with aiomqtt.Client(os.environ.get('MOSQUITTO', "mosquitto-ext"), 1883, client_id="rdf_to_mqtt-%s" % time.time(), keepalive=6) as mqtt: + log.info(f'connected to mqtt {mqtt}') + while True: + await asyncio.sleep(5) - mqtt = MqttClient(clientId='rdf_to_mqtt', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) + def start(): + asyncio.create_task(start2()) - port = 10008 - reactor.listenTCP(port, - cyclone.web.Application([ - (r"/()", cyclone.web.StaticFileHandler, { - "path": ".", - "default_filename": "index.html" - }), - (r'/output', OutputPage), - (r'/stats/(.*)', StatsHandler, { - 'serverName': 'rdf_to_mqtt' - }), - ], - mqtt=mqtt, - debug=arg['-v']), - interface='::') - log.warn('serving on %s', port) + log.info('make app') + app = Starlette(debug=True, + on_startup=[start], + routes=[ + Route('/', StaticFiles(directory='.', html=True)), + Route("/output", OutputPage().put, methods=["PUT"]), + ]) + app.add_middleware(PrometheusMiddleware, app_name='environment') + app.add_route("/metrics", handle_metrics) + log.info('return app') + return app - reactor.run() + +app = main() \ No newline at end of file