annotate protocols.py @ 15:61d4ccecfed8

rough refactor
author drewp@bigasterisk.com
date Sun, 28 Jan 2024 21:18:01 -0800
parents
children 24a574108365
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
15
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
1 import json
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
2 from color_convert import DeviceColor
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
3 from mqtt_io import MqttIo
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
4
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
5
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
6 class Transport:
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
7
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
8 def linked(self):
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
9 return {'label': str(self)}
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
10
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
11 async def send(self, dc: DeviceColor):
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
12 raise TypeError
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
13
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
14
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
15 def zigbeeHexMessage(dc: DeviceColor) -> dict:
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
16 msg: dict = {"transition": 0, "brightness": int(255 * dc.brightness)}
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
17 c = "#%02x%02x%02x" % (int(dc.r * 255), int(dc.g * 255), int(dc.b * 255))
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
18 msg["color"] = {"hex": c}
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
19 return msg
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
20
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
21
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
22 class ZigbeeTransport(Transport):
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
23
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
24 def __init__(self, mqtt: MqttIo, name: str, ieee: str):
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
25 self.mqtt = mqtt
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
26 self.name = name
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
27 self.ieee = ieee
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
28
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
29 def linked(self):
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
30 return {'url': f'https://bigasterisk.com/zigbee/console/#/device/{self.ieee}/info', 'label': 'do-bar'}
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
31
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
32 async def send(self, dc: DeviceColor):
61d4ccecfed8 rough refactor
drewp@bigasterisk.com
parents:
diff changeset
33 await self.mqtt.publish(f'zigbee/{self.name}/set', json.dumps(zigbeeHexMessage(dc)))