view light.py @ 25:cee43f550577

add /lightNames
author drewp@bigasterisk.com
date Fri, 02 Feb 2024 20:52:09 -0800
parents 7d9a056e29fe
children 33b3eb24506e
line wrap: on
line source

import asyncio
import logging
from dataclasses import dataclass
from typing import Callable

from color import Color
from color_convert import DeviceColor, brightnessConv, ikeaWhiteConv, oneWhiteConv, relayConv, twoWhitesConv, zbConv
from mqtt_io import MqttIo
from protocols import ShellyGen1WebTransport, SonoffRelayTransport, TasmotaWebTransport, Transport, ZigbeeTransport, espColorMessage, zbBrightnessMessage, zbRelayMessage, zbWhiteSpectrumMessage

log = logging.getLogger('lite')


@dataclass
class Light:
    name: str
    transport: Transport
    convertColor: Callable[[Color], DeviceColor]

    requestingColor: Color = Color.fromHex('#000000')
    requestingDeviceColor: DeviceColor = DeviceColor()

    emittingColor: Color = Color.fromHex('#000000')
    online: bool | None = None
    latencyMs: float | None = None

    notifyChanged: Callable | None = None

    def __post_init__(self):
        self.requestingDeviceColor = self.convertColor(self.requestingColor)

    def to_dict(self):
        d = {
            'name': self.name,
            'address': self.transport.linked(),
            'requestingColor': self.requestingColor.hex(),
            'requestingDeviceColor': self.requestingDeviceColor.summary(),
            'emittingColor': self.emittingColor.hex(),
            'online': self.online,
            'latencyMs': self.latencyMs,
        }

        return {'light': d}

    async def setColor(self, c: Color):
        if c == self.requestingColor:
            return
        self.requestingColor = c
        dc = self.convertColor(self.requestingColor)

        log.info(f'setColor from {self.requestingColor} to {c} = {dc.summary()=}')
        if dc != self.requestingDeviceColor:
            self.requestingDeviceColor = dc

            if self.notifyChanged:
                self.notifyChanged()

            # waits for the relevant round-trip
            log.info(f'transport send {self.requestingDeviceColor.summary()}')
            await self.transport.send(self.requestingDeviceColor)

        self.emittingColor = self.requestingColor
        if self.notifyChanged:
            self.notifyChanged()


def makeZbBar(mqtt: MqttIo, name: str, ieee: str) -> Light:
    return Light(name=name, convertColor=zbConv, transport=ZigbeeTransport(mqtt, name, ieee))


def makeTasmota(name: str, hostname: str) -> Light:
    return Light(name=name, convertColor=twoWhitesConv, transport=TasmotaWebTransport(hostname))


def makeShellyRGW2(name: str, hostname: str) -> Light:
    return Light(name=name, convertColor=oneWhiteConv, transport=ShellyGen1WebTransport(hostname))


def makeSonoffRelay(mqtt: MqttIo, name: str, topic: str) -> Light:
    return Light(name=name, convertColor=relayConv, transport=SonoffRelayTransport(mqtt, topic))


def makeZbIkeaWhiteTemp(mqtt: MqttIo, name: str, ieee: str) -> Light:
    return Light(name=name, convertColor=ikeaWhiteConv, transport=ZigbeeTransport(mqtt, name, ieee, msg=zbWhiteSpectrumMessage))


def makeZbBrightness(mqtt: MqttIo, name: str, ieee: str) -> Light:
    return Light(name=name, convertColor=brightnessConv, transport=ZigbeeTransport(mqtt, name, ieee, msg=zbBrightnessMessage))


def makeZbRelay(mqtt: MqttIo, name: str, ieee: str) -> Light:
    return Light(name=name, convertColor=relayConv, transport=ZigbeeTransport(mqtt, name, ieee, msg=zbRelayMessage))


def makeEspBrightness(mqtt: MqttIo, name: str, topicPrefix: str) -> Light:
    return Light(name=name,
                 convertColor=brightnessConv,
                 transport=ZigbeeTransport(mqtt, name, '', topic=lambda *arg: topicPrefix + '/command', msg=zbBrightnessMessage))


def makeEspRgbw(mqtt: MqttIo, name: str, topicPrefix: str) -> Light:
    return Light(name=name,
                 convertColor=oneWhiteConv,
                 transport=ZigbeeTransport(mqtt, name, '', topic=lambda *arg: topicPrefix + '/command', msg=espColorMessage))


class Lights:
    _d: dict[str, Light] = {}

    def __init__(self, mqtt: MqttIo):
        # todo: combine mqtt, aiohttp session, and pigpiod client into some
        # Transports object
        self.mqtt = mqtt

        self.add(makeZbBar(mqtt, 'do-bar', '0xa4c13844948d2da4'))
        self.add(makeTasmota('do-lamp', 'tasmota-9E2AB7-2743'))
        self.add(makeTasmota('li-high-shelf', 'light-li-ceil'))
        self.add(makeTasmota('tr-door', 'light-tr-door'))
        self.add(makeShellyRGW2('ki-ceiling', 'shellyrgbw2-e868e7f34c35'))
        self.add(makeShellyRGW2('ki-counter', 'shellyrgbw2-e868e7f34cb2'))

        self.add(makeZbIkeaWhiteTemp(mqtt, 'br-floor', '0x000b57fffedabd20'))
        self.add(makeZbIkeaWhiteTemp(mqtt, 'br-wall', '0x14b457fffe2dab6e'))
        self.add(makeZbIkeaWhiteTemp(mqtt, 'en', '0x000b57fffe988959'))
        self.add(makeZbIkeaWhiteTemp(mqtt, 'py', '0x000b57fffeaf42cd'))
        self.add(makeZbIkeaWhiteTemp(mqtt, 'rr-lamp', '0x000b57fffe32e5a5'))

        self.add(makeZbBrightness(mqtt, 'go-high', '0x847127fffebb3efa'))

        self.add(makeZbRelay(mqtt, 'ws-hanging', '0xd0cf5efffe720b46'))

        self.add(makeSonoffRelay(mqtt, 'li-lamp0', 'sonoff_0'))
        self.add(makeSonoffRelay(mqtt, 'li-lamp1', 'sonoff_1'))
        self.add(makeSonoffRelay(mqtt, 'li-lamp2', 'sonoff_2'))
        self.add(makeSonoffRelay(mqtt, 'li-lamp3', 'sonoff_3'))
        self.add(makeSonoffRelay(mqtt, 'li-lamp4', 'sonoff_4'))

        self.add(makeEspBrightness(mqtt, 'ws-high0', 'workshop/light/high0'))
        self.add(makeEspBrightness(mqtt, 'ws-high1', 'workshop/light/high1'))
        self.add(makeEspBrightness(mqtt, 'ws-high2', 'workshop/light/high2'))
        self.add(makeEspBrightness(mqtt, 'ws-high3', 'workshop/light/high3'))
        self.add(makeEspBrightness(mqtt, 'ws-kid', 'workshop/light/kid'))
        self.add(makeEspBrightness(mqtt, 'ws-sewing', 'workshop/light/sewing'))

        self.add(makeEspRgbw(mqtt, 'br-headboard', 'bed/light/headboard'))

        # ft-ceil
        # li-toys
        # sh-top
        # light-sh
        # ga-hanging

        # wled:
        #  string-tr
        #  string-hr
        #  light-tr-ball
        #  wled-ft-hanging

    def add(self, d: Light):
        d.notifyChanged = self.notifyChanged
        self._d[d.name] = d

        self.notifyChanged()

    def allNames(self) -> list[str]:
        return list(self._d.keys())

    def byName(self, name: str) -> Light:
        return self._d[name]

    def to_dict(self):
        return {'lights': [d.to_dict() for d in sorted(self._d.values(), key=lambda r: r.name)]}

    # the following is bad. Get a better events lib.
    _changeSleepTask: asyncio.Task | None = None

    async def changes(self):  # yields None on any data change
        while True:
            log.info('Lights has a change')
            yield None
            self._changeSleepTask = asyncio.create_task(self._sleep())
            try:
                await self._changeSleepTask
            except asyncio.CancelledError:
                pass
            self._changeSleepTask = None

    async def _sleep(self):
        await asyncio.sleep(100)

    def notifyChanged(self):
        log.info('Lights.notifyChanged()')
        if self._changeSleepTask is not None:
            self._changeSleepTask.cancel()