Mercurial > code > home > repos > light-bridge
view protocols.py @ 20:24a574108365
more protocols; bugs in setColor
author | drewp@bigasterisk.com |
---|---|
date | Mon, 29 Jan 2024 11:52:43 -0800 |
parents | 61d4ccecfed8 |
children | b8201490c731 |
line wrap: on
line source
import logging import json import aiohttp from color_convert import DeviceColor from mqtt_io import MqttIo log = logging.getLogger('prot') class Transport: def linked(self): return {'label': str(self)} async def send(self, dc: DeviceColor): raise TypeError def zigbeeHexMessage(dc: DeviceColor) -> dict: msg: dict = {"transition": 0, "brightness": int(255 * dc.brightness)} c = "#%02x%02x%02x" % (int(dc.r * 255), int(dc.g * 255), int(dc.b * 255)) msg["color"] = {"hex": c} return msg class ZigbeeTransport(Transport): def __init__(self, mqtt: MqttIo, name: str, ieee: str): self.mqtt = mqtt self.name = name self.ieee = ieee def linked(self): return {'url': f'https://bigasterisk.com/zigbee/console/#/device/{self.ieee}/info', 'label': self.name} async def send(self, dc: DeviceColor): await self.mqtt.publish(f'zigbee/{self.name}/set', json.dumps(zigbeeHexMessage(dc))) class SonoffRelayTransport(Transport): def __init__(self, mqtt: MqttIo, name: str): self.mqtt = mqtt self.name = name def linked(self): return {'label': self.name} async def send(self, dc: DeviceColor): topic = f'{self.name}/switch/sonoff_basic_relay/command' msg = 'ON' if dc.brightness else 'OFF' log.info(f'sonoff {topic=} {msg=}') await self.mqtt.publish(topic, msg) class _WebTransport(Transport): def __init__(self, hostname: str): self.hostname = hostname self._session = aiohttp.ClientSession() def linked(self): return {'url': f'http://{self.hostname}/', 'label': self.hostname} class TasmotaWebTransport(_WebTransport): async def send(self, dc: DeviceColor): cmnd = 'Color ' + ','.join(str(int(x * 255)) for x in (dc.r, dc.g, dc.b, dc.cw, dc.ww)) async with self._session.get(f'http://{self.hostname}/cm', params={'cmnd': cmnd}) as resp: await resp.text() # {"POWER":"ON","Dimmer":21,"Color":"3636363600","HSBColor":"0,0,21","White":21,"CT":153,"Channel":[21,21,21,21,0]} class ShellyGen1WebTransport(_WebTransport): async def send(self, dc: DeviceColor): # also see https://shelly-api-docs.shelly.cloud/gen1/#shelly-rgbw2-color-status for metrics async with self._session.get(f'http://{self.hostname}/light/0', params={ 'red': int(dc.r * 255), 'green': int(dc.g * 255), 'blue': int(dc.b * 255), 'white': int(dc.w * 255), }) as resp: await resp.text() # {..."mode":"color","red":255,"green":242,"blue":0,"white":255,"gain":59,"effect":0,"transition":0,"power":18.00,"overpower":false}