diff light.py @ 14:e3dbd04dab96

add mqtt; talk to first light (no throttling)
author drewp@bigasterisk.com
date Sun, 28 Jan 2024 20:49:42 -0800
parents 1c865af058e7
children 61d4ccecfed8
line wrap: on
line diff
--- a/light.py	Sun Jan 28 20:03:20 2024 -0800
+++ b/light.py	Sun Jan 28 20:49:42 2024 -0800
@@ -1,9 +1,11 @@
 import asyncio
+import json
 import logging
 from dataclasses import dataclass
 from typing import Callable
 
 from color import Color
+from mqtt_io import MqttIo
 
 log = logging.getLogger('light')
 
@@ -22,26 +24,42 @@
         return dict([(k, round(v, 3)) for k, v in self.__dict__.items() if v > 0])
 
 
-class Address:
+class Transport:
 
     def linked(self):
         return {'label': str(self)}
 
+    async def send(self, dc: DeviceColor):
+        raise TypeError
 
-class ZigbeeAddress(Address):
 
-    def __init__(self, name: str, ieee: str):
+def zigbeeHexMessage(color: DeviceColor, bw=False) -> dict:
+    bright = max(color.r, color.g, color.b)
+    msg: dict = {"transition": 0, "brightness": int(255 * bright)}
+    if not bw:
+        c = "#%02x%02x%02x" % (int(color.r * 255), int(color.g * 255), int(color.b * 255))
+        msg["color"] = {"hex": c}
+    return msg
+
+
+class ZigbeeTransport(Transport):
+
+    def __init__(self, mqtt: MqttIo, name: str, ieee: str):
+        self.mqtt = mqtt
         self.name = name
         self.ieee = ieee
 
     def linked(self):
         return {'url': f'https://bigasterisk.com/zigbee/console/#/device/{self.ieee}/info', 'label': 'do-bar'}
 
+    async def send(self, dc: DeviceColor):
+        await self.mqtt.publish(f'zigbee/{self.name}/set', json.dumps(zigbeeHexMessage(dc, bw=False)))
+
 
 @dataclass
 class Light:
     name: str
-    address: Address
+    address: Transport
 
     requestingColor: Color = Color.fromHex('#000000')
     requestingDeviceColor: DeviceColor = DeviceColor()
@@ -78,19 +96,29 @@
             return
         self.requestingColor = c
         self.requestingDeviceColor = self.deviceColor(self.requestingColor)
+
+        if self.notifyChanged:
+            self.notifyChanged()
+
+        # waits for the relevant round-trip
+        log.info(f'transport  send {self.requestingDeviceColor}')
+        await self.address.send(self.requestingDeviceColor)
+
+        self.emittingColor = self.requestingColor
         if self.notifyChanged:
             self.notifyChanged()
 
 
-def makeZbBar(name: str, ieee: str) -> Light:
-    return Light(name=name, address=ZigbeeAddress(name, ieee))
+def makeZbBar(mqtt: MqttIo, name: str, ieee: str) -> Light:
+    return Light(name=name, address=ZigbeeTransport(mqtt, name, ieee))
 
 
 class Lights:
     _d: dict[str, Light] = {}
 
-    def __init__(self):
-        self.add(makeZbBar('do-bar', '0xa4c13844948d2da4'))
+    def __init__(self, mqtt: MqttIo):
+        self.mqtt = mqtt
+        self.add(makeZbBar(mqtt, 'do-bar', '0xa4c13844948d2da4'))
 
     def add(self, d: Light):
         d.notifyChanged = self.notifyChanged