comparison protocols.py @ 21:b8201490c731

more light types
author drewp@bigasterisk.com
date Mon, 29 Jan 2024 12:27:08 -0800
parents 24a574108365
children 7d9a056e29fe
comparison
equal deleted inserted replaced
20:24a574108365 21:b8201490c731
3 import aiohttp 3 import aiohttp
4 from color_convert import DeviceColor 4 from color_convert import DeviceColor
5 from mqtt_io import MqttIo 5 from mqtt_io import MqttIo
6 6
7 log = logging.getLogger('prot') 7 log = logging.getLogger('prot')
8
8 9
9 class Transport: 10 class Transport:
10 11
11 def linked(self): 12 def linked(self):
12 return {'label': str(self)} 13 return {'label': str(self)}
13 14
14 async def send(self, dc: DeviceColor): 15 async def send(self, dc: DeviceColor):
15 raise TypeError 16 raise TypeError
16 17
17 18
18 def zigbeeHexMessage(dc: DeviceColor) -> dict: 19 def to8(x: float):
19 msg: dict = {"transition": 0, "brightness": int(255 * dc.brightness)} 20 return int(x * 255)
20 c = "#%02x%02x%02x" % (int(dc.r * 255), int(dc.g * 255), int(dc.b * 255))
21 msg["color"] = {"hex": c}
22 return msg
23 21
22
23 def zbColorMessage(dc: DeviceColor) -> dict:
24 return {
25 "transition": 0,
26 "brightness": to8(dc.brightness),
27 "color": {
28 "hex": "#%02x%02x%02x" % (to8(dc.r), to8(dc.g), to8(dc.b))
29 },
30 }
31
32
33 def zbBrightnessMessage(dc: DeviceColor) -> dict:
34 return {
35 "transition": 0,
36 "brightness": to8(dc.brightness),
37 }
38
39
40 def zbWhiteSpectrumMessage(dc: DeviceColor) -> dict:
41 return {
42 "transition": 0,
43 "brightness": to8(dc.brightness),
44 # temperature todo
45 }
46
47 def zbRelayMessage(dc: DeviceColor) -> dict:
48 return {'state': 'ON' if dc.brightness else 'OFF'}
49
50 def z2mSet(name):
51 return f'zigbee/{name}/set'
24 52
25 class ZigbeeTransport(Transport): 53 class ZigbeeTransport(Transport):
26 54
27 def __init__(self, mqtt: MqttIo, name: str, ieee: str): 55 def __init__(self, mqtt: MqttIo, name: str, ieee: str, topic=z2mSet, msg=zbColorMessage):
28 self.mqtt = mqtt 56 self.mqtt = mqtt
29 self.name = name 57 self.name = name
30 self.ieee = ieee 58 self.ieee = ieee
59 self.topic=topic
60 self.msg = msg
31 61
32 def linked(self): 62 def linked(self):
33 return {'url': f'https://bigasterisk.com/zigbee/console/#/device/{self.ieee}/info', 'label': self.name} 63 return {'url': f'https://bigasterisk.com/zigbee/console/#/device/{self.ieee}/info', 'label': self.name}
34 64
35 async def send(self, dc: DeviceColor): 65 async def send(self, dc: DeviceColor):
36 await self.mqtt.publish(f'zigbee/{self.name}/set', json.dumps(zigbeeHexMessage(dc))) 66 await self.mqtt.publish(self.topic(self.name), json.dumps(self.msg(dc)))
37 67
38 68
39 class SonoffRelayTransport(Transport): 69 class SonoffRelayTransport(Transport):
40 70
41 def __init__(self, mqtt: MqttIo, name: str): 71 def __init__(self, mqtt: MqttIo, name: str):