Mercurial > code > home > repos > sensor-bridge
view sensor_bridge.py @ 1:087b57dd3587 default tip
if dimcurve request times out, abort so we get its new address
author | drewp@bigasterisk.com |
---|---|
date | Sat, 14 Dec 2024 21:39:44 -0800 |
parents | 62cca1da7955 |
children |
line wrap: on
line source
import logging import os from typing import cast import aiomqtt import httpx from rdferry import StarletteServer logging.basicConfig(level=logging.INFO) log = logging.getLogger() DIMCURVE_SERVICE_HOST = os.environ['DIMCURVE_SERVICE_HOST'] httpClient = httpx.AsyncClient() def faderValueUrl(faderName: str): return f'http://{DIMCURVE_SERVICE_HOST}/dimcurve/api/faders/{faderName}/value' async def sendValue(faderName: str, value: float): try: await httpClient.put(faderValueUrl(faderName), content=str(value).encode('ascii'), timeout=2) except Exception: log.warning("put failed - restarting service", exc_info=True) os.abort() async def onMessage(message): log.info(f'{message.topic}: {message.payload}') payloadMap = { b'1_single': 1, b'2_single': 0.50, b'3_single': 0.25, b'4_single': 0, } value = payloadMap.get(cast(bytes, message.payload)) if value is None: raise ValueError(str(message.payload)) await sendValue('tr', value) async def watchMqtt(): client = aiomqtt.Client('mqtt2.bigasterisk.com') async with client: await client.subscribe('zigbee/tr-4button-1/action') await client.subscribe('zigbee/tr-4button-2/action') async for message in client.messages: try: await onMessage(message) except Exception: log.warning("onMessage failed", exc_info=True) async def watchMqttReconnect(): while True: try: await watchMqtt() except Exception: log.warning("watchMqtt restarting", exc_info=True) def main(): server = StarletteServer() server.serve(startup_tasks=[watchMqttReconnect()]) return app app = main()