Mercurial > code > home > repos > light-bridge
comparison light.py @ 17:1d15dc4d673f
cleanup color convert code path
author | drewp@bigasterisk.com |
---|---|
date | Sun, 28 Jan 2024 21:18:40 -0800 |
parents | 61d4ccecfed8 |
children | 24a574108365 |
comparison
equal
deleted
inserted
replaced
16:ab80e8826441 | 17:1d15dc4d673f |
---|---|
1 import asyncio | 1 import asyncio |
2 import json | |
3 import logging | 2 import logging |
4 from dataclasses import dataclass | 3 from dataclasses import dataclass |
5 from typing import Callable | 4 from typing import Callable |
6 | 5 |
7 from color import Color | 6 from color import Color |
7 from color_convert import DeviceColor, zbConv | |
8 from mqtt_io import MqttIo | 8 from mqtt_io import MqttIo |
9 from protocols import Transport, ZigbeeTransport | |
9 | 10 |
10 log = logging.getLogger('light') | 11 log = logging.getLogger('light') |
11 | 12 |
12 | 13 |
13 @dataclass | 14 @dataclass |
14 class Light: | 15 class Light: |
15 name: str | 16 name: str |
16 address: Transport | 17 transport: Transport |
18 convertColor: Callable[[Color], DeviceColor] | |
17 | 19 |
18 requestingColor: Color = Color.fromHex('#000000') | 20 requestingColor: Color = Color.fromHex('#000000') |
19 requestingDeviceColor: DeviceColor = DeviceColor() | 21 requestingDeviceColor: DeviceColor = DeviceColor() |
20 | 22 |
21 emittingColor: Color = Color.fromHex('#000000') | 23 emittingColor: Color = Color.fromHex('#000000') |
23 latencyMs: float | None = None | 25 latencyMs: float | None = None |
24 | 26 |
25 notifyChanged: Callable | None = None | 27 notifyChanged: Callable | None = None |
26 | 28 |
27 def __post_init__(self): | 29 def __post_init__(self): |
28 self.requestingDeviceColor = self.deviceColor(self.requestingColor) | 30 self.requestingDeviceColor = self.convertColor(self.requestingColor) |
29 | 31 |
30 def to_dict(self): | 32 def to_dict(self): |
31 d = { | 33 d = { |
32 'name': self.name, | 34 'name': self.name, |
33 'address': self.address.linked(), | 35 'address': self.transport.linked(), |
34 'requestingColor': self.requestingColor.hex(), | 36 'requestingColor': self.requestingColor.hex(), |
35 'requestingDeviceColor': self.requestingDeviceColor.summary(), | 37 'requestingDeviceColor': self.requestingDeviceColor.summary(), |
36 'emittingColor': self.emittingColor.hex(), | 38 'emittingColor': self.emittingColor.hex(), |
37 'online': self.online, | 39 'online': self.online, |
38 'latencyMs': self.latencyMs, | 40 'latencyMs': self.latencyMs, |
39 } | 41 } |
40 | 42 |
41 return {'light': d} | 43 return {'light': d} |
42 | 44 |
43 def deviceColor(self, c: Color) -> DeviceColor: | |
44 # do LUT here | |
45 return DeviceColor(r=c.r, g=c.g, b=c.b) | |
46 | |
47 async def setColor(self, c: Color): | 45 async def setColor(self, c: Color): |
48 log.info(f'setColor from {self.requestingColor} to {c}') | 46 log.info(f'setColor from {self.requestingColor} to {c}') |
49 if c == self.requestingColor: | 47 if c == self.requestingColor: |
50 return | 48 return |
51 self.requestingColor = c | 49 self.requestingColor = c |
52 self.requestingDeviceColor = self.deviceColor(self.requestingColor) | 50 self.requestingDeviceColor = self.convertColor(self.requestingColor) |
53 | 51 |
54 if self.notifyChanged: | 52 if self.notifyChanged: |
55 self.notifyChanged() | 53 self.notifyChanged() |
56 | 54 |
57 # waits for the relevant round-trip | 55 # waits for the relevant round-trip |
58 log.info(f'transport send {self.requestingDeviceColor}') | 56 log.info(f'transport send {self.requestingDeviceColor.summary()}') |
59 await self.address.send(self.requestingDeviceColor) | 57 await self.transport.send(self.requestingDeviceColor) |
60 | 58 |
61 self.emittingColor = self.requestingColor | 59 self.emittingColor = self.requestingColor |
62 if self.notifyChanged: | 60 if self.notifyChanged: |
63 self.notifyChanged() | 61 self.notifyChanged() |
64 | 62 |
65 | 63 |
66 def makeZbBar(mqtt: MqttIo, name: str, ieee: str) -> Light: | 64 def makeZbBar(mqtt: MqttIo, name: str, ieee: str) -> Light: |
67 return Light(name=name, address=ZigbeeTransport(mqtt, name, ieee)) | 65 return Light(name=name, convertColor=zbConv, transport=ZigbeeTransport(mqtt, name, ieee)) |
68 | 66 |
69 | 67 |
70 class Lights: | 68 class Lights: |
71 _d: dict[str, Light] = {} | 69 _d: dict[str, Light] = {} |
72 | 70 |
73 def __init__(self, mqtt: MqttIo): | 71 def __init__(self, mqtt: MqttIo): |
74 self.mqtt = mqtt | 72 self.mqtt = mqtt |
73 | |
75 self.add(makeZbBar(mqtt, 'do-bar', '0xa4c13844948d2da4')) | 74 self.add(makeZbBar(mqtt, 'do-bar', '0xa4c13844948d2da4')) |
76 | 75 |
77 def add(self, d: Light): | 76 def add(self, d: Light): |
78 d.notifyChanged = self.notifyChanged | 77 d.notifyChanged = self.notifyChanged |
79 self._d[d.name] = d | 78 self._d[d.name] = d |