Mercurial > code > home > repos > homeauto
view service/rdf_to_mqtt/rdf_to_mqtt.py @ 1754:92999dfbf321 default tip
add shelly support
author | drewp@bigasterisk.com |
---|---|
date | Tue, 04 Jun 2024 13:03:43 -0700 |
parents | d90cb7c06f15 |
children |
line wrap: on
line source
""" We get output statements that are like light9's deviceAttrs (:dev1 :color "#ff0000"), convert those to outputAttrs (:dev1 :red 255; :green 0; :blue 0) and post them to mqtt. This is like light9/bin/collector. """ import asyncio import json import os import time from prometheus_client import Counter, Gauge, Summary from rdflib import Namespace from starlette_exporter import PrometheusMiddleware, handle_metrics from starlette.applications import Starlette from starlette.requests import Request from starlette.responses import PlainTextResponse from starlette.routing import Route from starlette.staticfiles import StaticFiles import aiomqtt from devs import devs import rdf_over_http # from victorialogger import log import logging logging.basicConfig(level=logging.DEBUG) log = logging.getLogger(__name__) ROOM = Namespace('http://projects.bigasterisk.com/room/') PUT_REQUESTS = Summary('put_requests', 'calls') STATEMENT = Summary('on_statement', 'calls') MQTT_PUBLISH = Summary('mqtt_publish', 'calls') mqtt: aiomqtt.Client | None = None class OutputPage: async def put(self, request: Request) -> PlainTextResponse: with PUT_REQUESTS.time(): for stmt in rdf_over_http.rdfStatementsFromRequest(request.query_params, await request.body(), request.headers): await self._onStatement(stmt) return PlainTextResponse("ok") @STATEMENT.time() async def _onStatement(self, stmt): log.info(f'incoming statement: {stmt}') ignored = True for dev, attrs in devs.items(): if stmt[0] == ROOM['frontWindow']: ignored = ignored and self._publishFrontScreenText(stmt) if stmt[0:2] == (dev, ROOM['brightness']): log.info(f'brightness request: {stmt}') brightness = stmt[2].toPython() if attrs.get('values', '') == 'binary': await self._publishOnOff(attrs, brightness) else: await self._publishRgbw(attrs, brightness) ignored = False if stmt[0:2] == (dev, ROOM['inputSelector']): choice = stmt[2].toPython() await self._publish(topic=attrs['root'], message=f'input_{choice}') ignored = False if stmt[0:2] == (dev, ROOM['volumeChange']): delta = int(stmt[2].toPython()) which = 'up' if delta > 0 else 'down' await self._publish(topic=f'theater_blaster/ir_out/volume_{which}', message=json.dumps({'timed': abs(delta)})) ignored = False if stmt[0:2] == (dev, ROOM['color']): msg = self._onColor(stmt[2].toPython(), attrs) await self._publish(topic=attrs['root'], message=json.dumps(msg)) ignored = False if ignored: log.warn("ignoring %s", stmt) def _onColor(self, h, attrs): if isinstance(h, bytes): h = h.decode('utf8') msg = {} if h.endswith('K'): # accept "0.7*2200K" (brightness 0.7) # see https://www.zigbee2mqtt.io/information/mqtt_topics_and_message_structure.html#zigbee2mqttfriendly_nameset bright, kelvin = map(float, h[:-1].split('*')) msg['state'] = 'ON' msg["color_temp"] = round(1000000 / kelvin, 2) msg['brightness'] = int(bright * 255) # 1..20 look about the same else: r, g, b = int(h[1:3], 16), int(h[3:5], 16), int(h[5:7], 16) msg = { 'state': 'ON' if r or g or b else 'OFF', 'color': { 'r': r, 'g': g, 'b': b }, 'brightness': max(r, g, b), } if attrs.get('hasWhite', False): msg['white_value'] = max(r, g, b) msg.update(attrs.get('defaults', {})) return msg async def _publishOnOff(self, attrs, brightness): msg = 'OFF' if brightness > 0: msg = 'ON' await self._publish(topic=attrs['root'], message=msg) async def _publishRgbw(self, attrs, brightness): log.info(f'_publishRgbw {attrs=} {brightness}') for chan, scale in [('w1', 1), ('r', 1), ('g', .8), ('b', .8)]: await self._publish(topic=f"{attrs['root']}/light/kit_{chan}/command", messageJson={'state': 'ON', 'brightness': int(brightness * 255)}) async def _publishFrontScreenText(self, stmt): ignored = True for line in ['line1', 'line2', 'line3', 'line4']: if stmt[1] == ROOM[line]: ignored = False assert mqtt is not None await mqtt.publish('frontwindow/%s' % line, stmt[2].toPython()) return ignored @MQTT_PUBLISH.time() async def _publish(self, topic: str, messageJson: object = None, message: str | None = None): log.debug(f'mqtt.publish {topic} {message} {messageJson}') if messageJson is not None: message = json.dumps(messageJson) if mqtt is None: os.abort() try: await mqtt.publish(topic, message) except aiomqtt.error.MqttCodeError: log.error(f"publish {topic=} {message=} failed:", exc_info=1) os.abort() def main(): async def start2(): global mqtt async with aiomqtt.Client('mqtt2.bigasterisk.com', 1883, client_id="rdf_to_mqtt-%s" % time.time(), keepalive=6) as mqtt: log.info(f'connected to mqtt {mqtt}') while True: await asyncio.sleep(5) def start(): asyncio.create_task(start2()) log.info('make app') app = Starlette(debug=True, on_startup=[start], routes=[ Route('/', StaticFiles(directory='.', html=True)), Route("/output", OutputPage().put, methods=["PUT"]), ]) app.add_middleware(PrometheusMiddleware, app_name='rdf_to_mqtt') app.add_route("/metrics", handle_metrics) log.info('return app') return app app = main()