Mercurial > code > home > repos > light-bridge
annotate protocols.py @ 26:33b3eb24506e
wled (single bulb) support. more lights
author | drewp@bigasterisk.com |
---|---|
date | Sat, 03 Feb 2024 20:56:27 -0800 |
parents | 7d9a056e29fe |
children | fb2e91f230f4 |
rev | line source |
---|---|
20 | 1 import logging |
15 | 2 import json |
20 | 3 import aiohttp |
15 | 4 from color_convert import DeviceColor |
5 from mqtt_io import MqttIo | |
6 | |
20 | 7 log = logging.getLogger('prot') |
15 | 8 |
21 | 9 |
15 | 10 class Transport: |
11 | |
12 def linked(self): | |
13 return {'label': str(self)} | |
14 | |
15 async def send(self, dc: DeviceColor): | |
16 raise TypeError | |
17 | |
18 | |
21 | 19 def to8(x: float): |
20 return int(x * 255) | |
21 | |
22 | |
23 def zbColorMessage(dc: DeviceColor) -> dict: | |
24 return { | |
25 "transition": 0, | |
26 "brightness": to8(dc.brightness), | |
27 "color": { | |
28 "hex": "#%02x%02x%02x" % (to8(dc.r), to8(dc.g), to8(dc.b)) | |
29 }, | |
30 } | |
31 | |
15 | 32 |
21 | 33 def zbBrightnessMessage(dc: DeviceColor) -> dict: |
34 return { | |
35 "transition": 0, | |
36 "brightness": to8(dc.brightness), | |
37 } | |
38 | |
39 | |
40 def zbWhiteSpectrumMessage(dc: DeviceColor) -> dict: | |
41 return { | |
42 "transition": 0, | |
43 "brightness": to8(dc.brightness), | |
44 # temperature todo | |
45 } | |
46 | |
23 | 47 |
21 | 48 def zbRelayMessage(dc: DeviceColor) -> dict: |
49 return {'state': 'ON' if dc.brightness else 'OFF'} | |
50 | |
23 | 51 |
52 def espColorMessage(dc: DeviceColor) -> dict: | |
53 return { | |
26
33b3eb24506e
wled (single bulb) support. more lights
drewp@bigasterisk.com
parents:
23
diff
changeset
|
54 "state": 'ON', |
23 | 55 "color": { |
56 "r": to8(dc.r), | |
57 "g": to8(dc.g), | |
58 "b": to8(dc.b), | |
59 "w": to8(dc.w) | |
60 }, | |
61 } | |
62 | |
63 | |
21 | 64 def z2mSet(name): |
65 return f'zigbee/{name}/set' | |
15 | 66 |
23 | 67 |
15 | 68 class ZigbeeTransport(Transport): |
69 | |
21 | 70 def __init__(self, mqtt: MqttIo, name: str, ieee: str, topic=z2mSet, msg=zbColorMessage): |
15 | 71 self.mqtt = mqtt |
72 self.name = name | |
73 self.ieee = ieee | |
23 | 74 self.topic = topic |
21 | 75 self.msg = msg |
15 | 76 |
77 def linked(self): | |
20 | 78 return {'url': f'https://bigasterisk.com/zigbee/console/#/device/{self.ieee}/info', 'label': self.name} |
15 | 79 |
80 async def send(self, dc: DeviceColor): | |
21 | 81 await self.mqtt.publish(self.topic(self.name), json.dumps(self.msg(dc))) |
20 | 82 |
83 | |
84 class SonoffRelayTransport(Transport): | |
85 | |
86 def __init__(self, mqtt: MqttIo, name: str): | |
87 self.mqtt = mqtt | |
88 self.name = name | |
89 | |
90 def linked(self): | |
91 return {'label': self.name} | |
92 | |
93 async def send(self, dc: DeviceColor): | |
94 topic = f'{self.name}/switch/sonoff_basic_relay/command' | |
95 msg = 'ON' if dc.brightness else 'OFF' | |
96 log.info(f'sonoff {topic=} {msg=}') | |
97 await self.mqtt.publish(topic, msg) | |
98 | |
99 | |
100 class _WebTransport(Transport): | |
101 | |
102 def __init__(self, hostname: str): | |
103 self.hostname = hostname | |
104 self._session = aiohttp.ClientSession() | |
105 | |
106 def linked(self): | |
107 return {'url': f'http://{self.hostname}/', 'label': self.hostname} | |
108 | |
109 | |
110 class TasmotaWebTransport(_WebTransport): | |
111 | |
112 async def send(self, dc: DeviceColor): | |
23 | 113 cmnd = 'Color ' + ','.join(str(to8(x)) for x in (dc.r, dc.g, dc.b, dc.cw, dc.ww)) |
20 | 114 async with self._session.get(f'http://{self.hostname}/cm', params={'cmnd': cmnd}) as resp: |
115 await resp.text() | |
116 # {"POWER":"ON","Dimmer":21,"Color":"3636363600","HSBColor":"0,0,21","White":21,"CT":153,"Channel":[21,21,21,21,0]} | |
117 | |
118 | |
26
33b3eb24506e
wled (single bulb) support. more lights
drewp@bigasterisk.com
parents:
23
diff
changeset
|
119 class WledTransport(_WebTransport): |
33b3eb24506e
wled (single bulb) support. more lights
drewp@bigasterisk.com
parents:
23
diff
changeset
|
120 |
33b3eb24506e
wled (single bulb) support. more lights
drewp@bigasterisk.com
parents:
23
diff
changeset
|
121 async def send(self, dc: DeviceColor): |
33b3eb24506e
wled (single bulb) support. more lights
drewp@bigasterisk.com
parents:
23
diff
changeset
|
122 # see https://kno.wled.ge/interfaces/json-api/ |
33b3eb24506e
wled (single bulb) support. more lights
drewp@bigasterisk.com
parents:
23
diff
changeset
|
123 msg = { |
33b3eb24506e
wled (single bulb) support. more lights
drewp@bigasterisk.com
parents:
23
diff
changeset
|
124 'seg': [{ |
33b3eb24506e
wled (single bulb) support. more lights
drewp@bigasterisk.com
parents:
23
diff
changeset
|
125 "cct": 128, # 0 warm; 255 cool |
33b3eb24506e
wled (single bulb) support. more lights
drewp@bigasterisk.com
parents:
23
diff
changeset
|
126 "col": [[to8(x) for x in [dc.r, dc.g, dc.b, dc.cw]], [], []] |
33b3eb24506e
wled (single bulb) support. more lights
drewp@bigasterisk.com
parents:
23
diff
changeset
|
127 }] |
33b3eb24506e
wled (single bulb) support. more lights
drewp@bigasterisk.com
parents:
23
diff
changeset
|
128 } |
33b3eb24506e
wled (single bulb) support. more lights
drewp@bigasterisk.com
parents:
23
diff
changeset
|
129 async with self._session.post(f'http://{self.hostname}/json/state', headers={'content-type': 'application/json'}, data=json.dumps(msg)) as resp: |
33b3eb24506e
wled (single bulb) support. more lights
drewp@bigasterisk.com
parents:
23
diff
changeset
|
130 await resp.text() |
33b3eb24506e
wled (single bulb) support. more lights
drewp@bigasterisk.com
parents:
23
diff
changeset
|
131 # {"POWER":"ON","Dimmer":21,"Color":"3636363600","HSBColor":"0,0,21","White":21,"CT":153,"Channel":[21,21,21,21,0]} |
33b3eb24506e
wled (single bulb) support. more lights
drewp@bigasterisk.com
parents:
23
diff
changeset
|
132 |
33b3eb24506e
wled (single bulb) support. more lights
drewp@bigasterisk.com
parents:
23
diff
changeset
|
133 |
20 | 134 class ShellyGen1WebTransport(_WebTransport): |
135 | |
136 async def send(self, dc: DeviceColor): | |
137 # also see https://shelly-api-docs.shelly.cloud/gen1/#shelly-rgbw2-color-status for metrics | |
23 | 138 async with self._session.get(f'http://{self.hostname}/light/0', params={ |
139 'red': to8(dc.r), | |
140 'green': to8(dc.g), | |
141 'blue': to8(dc.b), | |
142 'white': to8(dc.w), | |
143 }) as resp: | |
20 | 144 await resp.text() |
145 # {..."mode":"color","red":255,"green":242,"blue":0,"white":255,"gain":59,"effect":0,"transition":0,"power":18.00,"overpower":false} |