view sensor_bridge.py @ 1:087b57dd3587 default tip

if dimcurve request times out, abort so we get its new address
author drewp@bigasterisk.com
date Sat, 14 Dec 2024 21:39:44 -0800
parents 62cca1da7955
children
line wrap: on
line source

import logging
import os
from typing import cast

import aiomqtt
import httpx
from rdferry import StarletteServer

logging.basicConfig(level=logging.INFO)
log = logging.getLogger()

DIMCURVE_SERVICE_HOST = os.environ['DIMCURVE_SERVICE_HOST']

httpClient = httpx.AsyncClient()


def faderValueUrl(faderName: str):
    return f'http://{DIMCURVE_SERVICE_HOST}/dimcurve/api/faders/{faderName}/value'


async def sendValue(faderName: str, value: float):
    try:
        await httpClient.put(faderValueUrl(faderName), content=str(value).encode('ascii'), timeout=2)
    except Exception:
        log.warning("put failed - restarting service", exc_info=True)
        os.abort()


async def onMessage(message):
    log.info(f'{message.topic}: {message.payload}')
    payloadMap = {
        b'1_single': 1,
        b'2_single': 0.50,
        b'3_single': 0.25,
        b'4_single': 0,
    }
    value = payloadMap.get(cast(bytes, message.payload))
    if value is None:
        raise ValueError(str(message.payload))
    await sendValue('tr', value)


async def watchMqtt():
    client = aiomqtt.Client('mqtt2.bigasterisk.com')
    async with client:
        await client.subscribe('zigbee/tr-4button-1/action')
        await client.subscribe('zigbee/tr-4button-2/action')
        async for message in client.messages:
            try:
                await onMessage(message)
            except Exception:
                log.warning("onMessage failed", exc_info=True)


async def watchMqttReconnect():
    while True:
        try:
            await watchMqtt()
        except Exception:
            log.warning("watchMqtt restarting", exc_info=True)


def main():

    server = StarletteServer()
    server.serve(startup_tasks=[watchMqttReconnect()])

    return app


app = main()