view service/rdf_to_mqtt/rdf_to_mqtt.py @ 1744:09df2b4b886f

deployment
author drewp@bigasterisk.com
date Thu, 09 Nov 2023 17:21:33 -0800
parents 3f4b447d65f5
children d90cb7c06f15
line wrap: on
line source

"""
We get output statements that are like light9's deviceAttrs (:dev1 :color "#ff0000"),
convert those to outputAttrs (:dev1 :red 255; :green 0; :blue 0) and post them to mqtt.

This is like light9/bin/collector.
"""
import asyncio
import json
import os
import time

from prometheus_client import Counter, Gauge, Summary
from rdflib import Namespace
from starlette_exporter import PrometheusMiddleware, handle_metrics
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import PlainTextResponse
from starlette.routing import Route
from starlette.staticfiles import StaticFiles
import aiomqtt

from devs import devs
import rdf_over_http

# from victorialogger import log
import logging

logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger(__name__)

ROOM = Namespace('http://projects.bigasterisk.com/room/')

PUT_REQUESTS = Summary('put_requests', 'calls')
STATEMENT = Summary('on_statement', 'calls')
MQTT_PUBLISH = Summary('mqtt_publish', 'calls')

mqtt: aiomqtt.Client | None = None


class OutputPage:

    async def put(self, request: Request) -> PlainTextResponse:
        with PUT_REQUESTS.time():
            for stmt in rdf_over_http.rdfStatementsFromRequest(request.query_params, await request.body(), request.headers):
                await self._onStatement(stmt)
        return PlainTextResponse("ok")

    @STATEMENT.time()
    async def _onStatement(self, stmt):
        log.info(f'incoming statement: {stmt}')
        ignored = True
        for dev, attrs in devs.items():
            if stmt[0] == ROOM['frontWindow']:
                ignored = ignored and self._publishFrontScreenText(stmt)
            if stmt[0:2] == (dev, ROOM['brightness']):
                log.info(f'brightness request: {stmt}')
                brightness = stmt[2].toPython()

                if attrs.get('values', '') == 'binary':
                    await self._publishOnOff(attrs, brightness)
                else:
                    await self._publishRgbw(attrs, brightness)
                ignored = False
            if stmt[0:2] == (dev, ROOM['inputSelector']):
                choice = stmt[2].toPython()
                await self._publish(topic=attrs['root'], message=f'input_{choice}')
                ignored = False
            if stmt[0:2] == (dev, ROOM['volumeChange']):
                delta = int(stmt[2].toPython())
                which = 'up' if delta > 0 else 'down'
                await self._publish(topic=f'theater_blaster/ir_out/volume_{which}', message=json.dumps({'timed': abs(delta)}))
                ignored = False
            if stmt[0:2] == (dev, ROOM['color']):
                msg = self._onColor(stmt[2].toPython(), attrs)
                await self._publish(topic=attrs['root'], message=json.dumps(msg))
                ignored = False

        if ignored:
            log.warn("ignoring %s", stmt)

    def _onColor(self, h, attrs):
        if isinstance(h, bytes):
            h = h.decode('utf8')
        msg = {}
        if h.endswith('K'):  # accept "0.7*2200K" (brightness 0.7)
            # see https://www.zigbee2mqtt.io/information/mqtt_topics_and_message_structure.html#zigbee2mqttfriendly_nameset
            bright, kelvin = map(float, h[:-1].split('*'))
            msg['state'] = 'ON'
            msg["color_temp"] = round(1000000 / kelvin, 2)
            msg['brightness'] = int(bright * 255)  # 1..20 look about the same
        else:
            r, g, b = int(h[1:3], 16), int(h[3:5], 16), int(h[5:7], 16)
            msg = {
                'state': 'ON' if r or g or b else 'OFF',
                'color': {
                    'r': r,
                    'g': g,
                    'b': b
                },
                'brightness': max(r, g, b),
            }

            if attrs.get('hasWhite', False):
                msg['white_value'] = max(r, g, b)
        msg.update(attrs.get('defaults', {}))
        return msg

    async def _publishOnOff(self, attrs, brightness):
        msg = 'OFF'
        if brightness > 0:
            msg = 'ON'
        await self._publish(topic=attrs['root'], message=msg)

    async def _publishRgbw(self, attrs, brightness):
        for chan, scale in [('w1', 1), ('r', 1), ('g', .8), ('b', .8)]:
            await self._publish(topic=f"{attrs['root']}/light/kit_{chan}/command", messageJson={'state': 'ON', 'brightness': int(brightness * 255)})

    async def _publishFrontScreenText(self, stmt):
        ignored = True
        for line in ['line1', 'line2', 'line3', 'line4']:
            if stmt[1] == ROOM[line]:
                ignored = False
                assert mqtt is not None
                await mqtt.publish('frontwindow/%s' % line, stmt[2].toPython())
        return ignored

    @MQTT_PUBLISH.time()
    async def _publish(self, topic: str, messageJson: object = None, message: str | None = None):
        log.debug(f'mqtt.publish {topic} {message} {messageJson}')
        if messageJson is not None:
            message = json.dumps(messageJson)
        assert mqtt is not None
        await mqtt.publish(topic, message)


def main():

    async def start2():
        global mqtt
        async with aiomqtt.Client('mqtt2.bigasterisk.com', 1883, client_id="rdf_to_mqtt-%s" % time.time(), keepalive=6) as mqtt:
            log.info(f'connected to mqtt {mqtt}')
            while True:
                await asyncio.sleep(5)

    def start():
        asyncio.create_task(start2())

    log.info('make app')
    app = Starlette(debug=True,
                    on_startup=[start],
                    routes=[
                        Route('/', StaticFiles(directory='.', html=True)),
                        Route("/output", OutputPage().put, methods=["PUT"]),
                    ])
    app.add_middleware(PrometheusMiddleware, app_name='environment')
    app.add_route("/metrics", handle_metrics)
    log.info('return app')
    return app


app = main()