view protocols.py @ 15:61d4ccecfed8

rough refactor
author drewp@bigasterisk.com
date Sun, 28 Jan 2024 21:18:01 -0800
parents
children 24a574108365
line wrap: on
line source

import json
from color_convert import DeviceColor
from mqtt_io import MqttIo


class Transport:

    def linked(self):
        return {'label': str(self)}

    async def send(self, dc: DeviceColor):
        raise TypeError


def zigbeeHexMessage(dc: DeviceColor) -> dict:
    msg: dict = {"transition": 0, "brightness": int(255 * dc.brightness)}
    c = "#%02x%02x%02x" % (int(dc.r * 255), int(dc.g * 255), int(dc.b * 255))
    msg["color"] = {"hex": c}
    return msg


class ZigbeeTransport(Transport):

    def __init__(self, mqtt: MqttIo, name: str, ieee: str):
        self.mqtt = mqtt
        self.name = name
        self.ieee = ieee

    def linked(self):
        return {'url': f'https://bigasterisk.com/zigbee/console/#/device/{self.ieee}/info', 'label': 'do-bar'}

    async def send(self, dc: DeviceColor):
        await self.mqtt.publish(f'zigbee/{self.name}/set', json.dumps(zigbeeHexMessage(dc)))