Mercurial > code > home > repos > light-bridge
comparison light.py @ 14:e3dbd04dab96
add mqtt; talk to first light (no throttling)
author | drewp@bigasterisk.com |
---|---|
date | Sun, 28 Jan 2024 20:49:42 -0800 |
parents | 1c865af058e7 |
children | 61d4ccecfed8 |
comparison
equal
deleted
inserted
replaced
13:1c865af058e7 | 14:e3dbd04dab96 |
---|---|
1 import asyncio | 1 import asyncio |
2 import json | |
2 import logging | 3 import logging |
3 from dataclasses import dataclass | 4 from dataclasses import dataclass |
4 from typing import Callable | 5 from typing import Callable |
5 | 6 |
6 from color import Color | 7 from color import Color |
8 from mqtt_io import MqttIo | |
7 | 9 |
8 log = logging.getLogger('light') | 10 log = logging.getLogger('light') |
9 | 11 |
10 | 12 |
11 @dataclass(frozen=True) | 13 @dataclass(frozen=True) |
20 | 22 |
21 def summary(self) -> dict: | 23 def summary(self) -> dict: |
22 return dict([(k, round(v, 3)) for k, v in self.__dict__.items() if v > 0]) | 24 return dict([(k, round(v, 3)) for k, v in self.__dict__.items() if v > 0]) |
23 | 25 |
24 | 26 |
25 class Address: | 27 class Transport: |
26 | 28 |
27 def linked(self): | 29 def linked(self): |
28 return {'label': str(self)} | 30 return {'label': str(self)} |
29 | 31 |
32 async def send(self, dc: DeviceColor): | |
33 raise TypeError | |
30 | 34 |
31 class ZigbeeAddress(Address): | |
32 | 35 |
33 def __init__(self, name: str, ieee: str): | 36 def zigbeeHexMessage(color: DeviceColor, bw=False) -> dict: |
37 bright = max(color.r, color.g, color.b) | |
38 msg: dict = {"transition": 0, "brightness": int(255 * bright)} | |
39 if not bw: | |
40 c = "#%02x%02x%02x" % (int(color.r * 255), int(color.g * 255), int(color.b * 255)) | |
41 msg["color"] = {"hex": c} | |
42 return msg | |
43 | |
44 | |
45 class ZigbeeTransport(Transport): | |
46 | |
47 def __init__(self, mqtt: MqttIo, name: str, ieee: str): | |
48 self.mqtt = mqtt | |
34 self.name = name | 49 self.name = name |
35 self.ieee = ieee | 50 self.ieee = ieee |
36 | 51 |
37 def linked(self): | 52 def linked(self): |
38 return {'url': f'https://bigasterisk.com/zigbee/console/#/device/{self.ieee}/info', 'label': 'do-bar'} | 53 return {'url': f'https://bigasterisk.com/zigbee/console/#/device/{self.ieee}/info', 'label': 'do-bar'} |
39 | 54 |
55 async def send(self, dc: DeviceColor): | |
56 await self.mqtt.publish(f'zigbee/{self.name}/set', json.dumps(zigbeeHexMessage(dc, bw=False))) | |
57 | |
40 | 58 |
41 @dataclass | 59 @dataclass |
42 class Light: | 60 class Light: |
43 name: str | 61 name: str |
44 address: Address | 62 address: Transport |
45 | 63 |
46 requestingColor: Color = Color.fromHex('#000000') | 64 requestingColor: Color = Color.fromHex('#000000') |
47 requestingDeviceColor: DeviceColor = DeviceColor() | 65 requestingDeviceColor: DeviceColor = DeviceColor() |
48 | 66 |
49 emittingColor: Color = Color.fromHex('#000000') | 67 emittingColor: Color = Color.fromHex('#000000') |
76 log.info(f'setColor from {self.requestingColor} to {c}') | 94 log.info(f'setColor from {self.requestingColor} to {c}') |
77 if c == self.requestingColor: | 95 if c == self.requestingColor: |
78 return | 96 return |
79 self.requestingColor = c | 97 self.requestingColor = c |
80 self.requestingDeviceColor = self.deviceColor(self.requestingColor) | 98 self.requestingDeviceColor = self.deviceColor(self.requestingColor) |
99 | |
100 if self.notifyChanged: | |
101 self.notifyChanged() | |
102 | |
103 # waits for the relevant round-trip | |
104 log.info(f'transport send {self.requestingDeviceColor}') | |
105 await self.address.send(self.requestingDeviceColor) | |
106 | |
107 self.emittingColor = self.requestingColor | |
81 if self.notifyChanged: | 108 if self.notifyChanged: |
82 self.notifyChanged() | 109 self.notifyChanged() |
83 | 110 |
84 | 111 |
85 def makeZbBar(name: str, ieee: str) -> Light: | 112 def makeZbBar(mqtt: MqttIo, name: str, ieee: str) -> Light: |
86 return Light(name=name, address=ZigbeeAddress(name, ieee)) | 113 return Light(name=name, address=ZigbeeTransport(mqtt, name, ieee)) |
87 | 114 |
88 | 115 |
89 class Lights: | 116 class Lights: |
90 _d: dict[str, Light] = {} | 117 _d: dict[str, Light] = {} |
91 | 118 |
92 def __init__(self): | 119 def __init__(self, mqtt: MqttIo): |
93 self.add(makeZbBar('do-bar', '0xa4c13844948d2da4')) | 120 self.mqtt = mqtt |
121 self.add(makeZbBar(mqtt, 'do-bar', '0xa4c13844948d2da4')) | |
94 | 122 |
95 def add(self, d: Light): | 123 def add(self, d: Light): |
96 d.notifyChanged = self.notifyChanged | 124 d.notifyChanged = self.notifyChanged |
97 self._d[d.name] = d | 125 self._d[d.name] = d |
98 | 126 |