Mercurial > code > home > repos > light-bridge
diff light.py @ 14:e3dbd04dab96
add mqtt; talk to first light (no throttling)
author | drewp@bigasterisk.com |
---|---|
date | Sun, 28 Jan 2024 20:49:42 -0800 |
parents | 1c865af058e7 |
children | 61d4ccecfed8 |
line wrap: on
line diff
--- a/light.py Sun Jan 28 20:03:20 2024 -0800 +++ b/light.py Sun Jan 28 20:49:42 2024 -0800 @@ -1,9 +1,11 @@ import asyncio +import json import logging from dataclasses import dataclass from typing import Callable from color import Color +from mqtt_io import MqttIo log = logging.getLogger('light') @@ -22,26 +24,42 @@ return dict([(k, round(v, 3)) for k, v in self.__dict__.items() if v > 0]) -class Address: +class Transport: def linked(self): return {'label': str(self)} + async def send(self, dc: DeviceColor): + raise TypeError -class ZigbeeAddress(Address): - def __init__(self, name: str, ieee: str): +def zigbeeHexMessage(color: DeviceColor, bw=False) -> dict: + bright = max(color.r, color.g, color.b) + msg: dict = {"transition": 0, "brightness": int(255 * bright)} + if not bw: + c = "#%02x%02x%02x" % (int(color.r * 255), int(color.g * 255), int(color.b * 255)) + msg["color"] = {"hex": c} + return msg + + +class ZigbeeTransport(Transport): + + def __init__(self, mqtt: MqttIo, name: str, ieee: str): + self.mqtt = mqtt self.name = name self.ieee = ieee def linked(self): return {'url': f'https://bigasterisk.com/zigbee/console/#/device/{self.ieee}/info', 'label': 'do-bar'} + async def send(self, dc: DeviceColor): + await self.mqtt.publish(f'zigbee/{self.name}/set', json.dumps(zigbeeHexMessage(dc, bw=False))) + @dataclass class Light: name: str - address: Address + address: Transport requestingColor: Color = Color.fromHex('#000000') requestingDeviceColor: DeviceColor = DeviceColor() @@ -78,19 +96,29 @@ return self.requestingColor = c self.requestingDeviceColor = self.deviceColor(self.requestingColor) + + if self.notifyChanged: + self.notifyChanged() + + # waits for the relevant round-trip + log.info(f'transport send {self.requestingDeviceColor}') + await self.address.send(self.requestingDeviceColor) + + self.emittingColor = self.requestingColor if self.notifyChanged: self.notifyChanged() -def makeZbBar(name: str, ieee: str) -> Light: - return Light(name=name, address=ZigbeeAddress(name, ieee)) +def makeZbBar(mqtt: MqttIo, name: str, ieee: str) -> Light: + return Light(name=name, address=ZigbeeTransport(mqtt, name, ieee)) class Lights: _d: dict[str, Light] = {} - def __init__(self): - self.add(makeZbBar('do-bar', '0xa4c13844948d2da4')) + def __init__(self, mqtt: MqttIo): + self.mqtt = mqtt + self.add(makeZbBar(mqtt, 'do-bar', '0xa4c13844948d2da4')) def add(self, d: Light): d.notifyChanged = self.notifyChanged