view light.py @ 14:e3dbd04dab96

add mqtt; talk to first light (no throttling)
author drewp@bigasterisk.com
date Sun, 28 Jan 2024 20:49:42 -0800
parents 1c865af058e7
children 61d4ccecfed8
line wrap: on
line source

import asyncio
import json
import logging
from dataclasses import dataclass
from typing import Callable

from color import Color
from mqtt_io import MqttIo

log = logging.getLogger('light')


@dataclass(frozen=True)
class DeviceColor:
    """neutral representation of the adjusted color that we send to a device"""
    r: float = 0
    g: float = 0
    b: float = 0
    w: float = 0
    x: float = 0
    y: float = 0

    def summary(self) -> dict:
        return dict([(k, round(v, 3)) for k, v in self.__dict__.items() if v > 0])


class Transport:

    def linked(self):
        return {'label': str(self)}

    async def send(self, dc: DeviceColor):
        raise TypeError


def zigbeeHexMessage(color: DeviceColor, bw=False) -> dict:
    bright = max(color.r, color.g, color.b)
    msg: dict = {"transition": 0, "brightness": int(255 * bright)}
    if not bw:
        c = "#%02x%02x%02x" % (int(color.r * 255), int(color.g * 255), int(color.b * 255))
        msg["color"] = {"hex": c}
    return msg


class ZigbeeTransport(Transport):

    def __init__(self, mqtt: MqttIo, name: str, ieee: str):
        self.mqtt = mqtt
        self.name = name
        self.ieee = ieee

    def linked(self):
        return {'url': f'https://bigasterisk.com/zigbee/console/#/device/{self.ieee}/info', 'label': 'do-bar'}

    async def send(self, dc: DeviceColor):
        await self.mqtt.publish(f'zigbee/{self.name}/set', json.dumps(zigbeeHexMessage(dc, bw=False)))


@dataclass
class Light:
    name: str
    address: Transport

    requestingColor: Color = Color.fromHex('#000000')
    requestingDeviceColor: DeviceColor = DeviceColor()

    emittingColor: Color = Color.fromHex('#000000')
    online: bool | None = None
    latencyMs: float | None = None

    notifyChanged: Callable | None = None

    def __post_init__(self):
        self.requestingDeviceColor = self.deviceColor(self.requestingColor)

    def to_dict(self):
        d = {
            'name': self.name,
            'address': self.address.linked(),
            'requestingColor': self.requestingColor.hex(),
            'requestingDeviceColor': self.requestingDeviceColor.summary(),
            'emittingColor': self.emittingColor.hex(),
            'online': self.online,
            'latencyMs': self.latencyMs,
        }

        return {'light': d}

    def deviceColor(self, c: Color) -> DeviceColor:
        # do LUT here
        return DeviceColor(r=c.r, g=c.g, b=c.b)

    async def setColor(self, c: Color):
        log.info(f'setColor from {self.requestingColor} to {c}')
        if c == self.requestingColor:
            return
        self.requestingColor = c
        self.requestingDeviceColor = self.deviceColor(self.requestingColor)

        if self.notifyChanged:
            self.notifyChanged()

        # waits for the relevant round-trip
        log.info(f'transport  send {self.requestingDeviceColor}')
        await self.address.send(self.requestingDeviceColor)

        self.emittingColor = self.requestingColor
        if self.notifyChanged:
            self.notifyChanged()


def makeZbBar(mqtt: MqttIo, name: str, ieee: str) -> Light:
    return Light(name=name, address=ZigbeeTransport(mqtt, name, ieee))


class Lights:
    _d: dict[str, Light] = {}

    def __init__(self, mqtt: MqttIo):
        self.mqtt = mqtt
        self.add(makeZbBar(mqtt, 'do-bar', '0xa4c13844948d2da4'))

    def add(self, d: Light):
        d.notifyChanged = self.notifyChanged
        self._d[d.name] = d

        self.notifyChanged()

    def byName(self, name: str) -> Light:
        return self._d[name]

    def to_dict(self):
        return {'lights': [d.to_dict() for d in sorted(self._d.values(), key=lambda r: r.name)]}

    # the following is bad. Get a better events lib.
    _changeSleepTask: asyncio.Task | None = None

    async def changes(self):  # yields None on any data change
        while True:
            log.info('Lights has a change')
            yield None
            self._changeSleepTask = asyncio.create_task(self._sleep())
            try:
                await self._changeSleepTask
            except asyncio.CancelledError:
                pass
            self._changeSleepTask = None

    async def _sleep(self):
        await asyncio.sleep(100)

    def notifyChanged(self):
        log.info('Lights.notifyChanged()')
        if self._changeSleepTask is not None:
            self._changeSleepTask.cancel()