15
|
1 import json
|
|
2 from color_convert import DeviceColor
|
|
3 from mqtt_io import MqttIo
|
|
4
|
|
5
|
|
6 class Transport:
|
|
7
|
|
8 def linked(self):
|
|
9 return {'label': str(self)}
|
|
10
|
|
11 async def send(self, dc: DeviceColor):
|
|
12 raise TypeError
|
|
13
|
|
14
|
|
15 def zigbeeHexMessage(dc: DeviceColor) -> dict:
|
|
16 msg: dict = {"transition": 0, "brightness": int(255 * dc.brightness)}
|
|
17 c = "#%02x%02x%02x" % (int(dc.r * 255), int(dc.g * 255), int(dc.b * 255))
|
|
18 msg["color"] = {"hex": c}
|
|
19 return msg
|
|
20
|
|
21
|
|
22 class ZigbeeTransport(Transport):
|
|
23
|
|
24 def __init__(self, mqtt: MqttIo, name: str, ieee: str):
|
|
25 self.mqtt = mqtt
|
|
26 self.name = name
|
|
27 self.ieee = ieee
|
|
28
|
|
29 def linked(self):
|
|
30 return {'url': f'https://bigasterisk.com/zigbee/console/#/device/{self.ieee}/info', 'label': 'do-bar'}
|
|
31
|
|
32 async def send(self, dc: DeviceColor):
|
|
33 await self.mqtt.publish(f'zigbee/{self.name}/set', json.dumps(zigbeeHexMessage(dc)))
|