comparison light.py @ 14:e3dbd04dab96

add mqtt; talk to first light (no throttling)
author drewp@bigasterisk.com
date Sun, 28 Jan 2024 20:49:42 -0800
parents 1c865af058e7
children 61d4ccecfed8
comparison
equal deleted inserted replaced
13:1c865af058e7 14:e3dbd04dab96
1 import asyncio 1 import asyncio
2 import json
2 import logging 3 import logging
3 from dataclasses import dataclass 4 from dataclasses import dataclass
4 from typing import Callable 5 from typing import Callable
5 6
6 from color import Color 7 from color import Color
8 from mqtt_io import MqttIo
7 9
8 log = logging.getLogger('light') 10 log = logging.getLogger('light')
9 11
10 12
11 @dataclass(frozen=True) 13 @dataclass(frozen=True)
20 22
21 def summary(self) -> dict: 23 def summary(self) -> dict:
22 return dict([(k, round(v, 3)) for k, v in self.__dict__.items() if v > 0]) 24 return dict([(k, round(v, 3)) for k, v in self.__dict__.items() if v > 0])
23 25
24 26
25 class Address: 27 class Transport:
26 28
27 def linked(self): 29 def linked(self):
28 return {'label': str(self)} 30 return {'label': str(self)}
29 31
32 async def send(self, dc: DeviceColor):
33 raise TypeError
30 34
31 class ZigbeeAddress(Address):
32 35
33 def __init__(self, name: str, ieee: str): 36 def zigbeeHexMessage(color: DeviceColor, bw=False) -> dict:
37 bright = max(color.r, color.g, color.b)
38 msg: dict = {"transition": 0, "brightness": int(255 * bright)}
39 if not bw:
40 c = "#%02x%02x%02x" % (int(color.r * 255), int(color.g * 255), int(color.b * 255))
41 msg["color"] = {"hex": c}
42 return msg
43
44
45 class ZigbeeTransport(Transport):
46
47 def __init__(self, mqtt: MqttIo, name: str, ieee: str):
48 self.mqtt = mqtt
34 self.name = name 49 self.name = name
35 self.ieee = ieee 50 self.ieee = ieee
36 51
37 def linked(self): 52 def linked(self):
38 return {'url': f'https://bigasterisk.com/zigbee/console/#/device/{self.ieee}/info', 'label': 'do-bar'} 53 return {'url': f'https://bigasterisk.com/zigbee/console/#/device/{self.ieee}/info', 'label': 'do-bar'}
39 54
55 async def send(self, dc: DeviceColor):
56 await self.mqtt.publish(f'zigbee/{self.name}/set', json.dumps(zigbeeHexMessage(dc, bw=False)))
57
40 58
41 @dataclass 59 @dataclass
42 class Light: 60 class Light:
43 name: str 61 name: str
44 address: Address 62 address: Transport
45 63
46 requestingColor: Color = Color.fromHex('#000000') 64 requestingColor: Color = Color.fromHex('#000000')
47 requestingDeviceColor: DeviceColor = DeviceColor() 65 requestingDeviceColor: DeviceColor = DeviceColor()
48 66
49 emittingColor: Color = Color.fromHex('#000000') 67 emittingColor: Color = Color.fromHex('#000000')
76 log.info(f'setColor from {self.requestingColor} to {c}') 94 log.info(f'setColor from {self.requestingColor} to {c}')
77 if c == self.requestingColor: 95 if c == self.requestingColor:
78 return 96 return
79 self.requestingColor = c 97 self.requestingColor = c
80 self.requestingDeviceColor = self.deviceColor(self.requestingColor) 98 self.requestingDeviceColor = self.deviceColor(self.requestingColor)
99
100 if self.notifyChanged:
101 self.notifyChanged()
102
103 # waits for the relevant round-trip
104 log.info(f'transport send {self.requestingDeviceColor}')
105 await self.address.send(self.requestingDeviceColor)
106
107 self.emittingColor = self.requestingColor
81 if self.notifyChanged: 108 if self.notifyChanged:
82 self.notifyChanged() 109 self.notifyChanged()
83 110
84 111
85 def makeZbBar(name: str, ieee: str) -> Light: 112 def makeZbBar(mqtt: MqttIo, name: str, ieee: str) -> Light:
86 return Light(name=name, address=ZigbeeAddress(name, ieee)) 113 return Light(name=name, address=ZigbeeTransport(mqtt, name, ieee))
87 114
88 115
89 class Lights: 116 class Lights:
90 _d: dict[str, Light] = {} 117 _d: dict[str, Light] = {}
91 118
92 def __init__(self): 119 def __init__(self, mqtt: MqttIo):
93 self.add(makeZbBar('do-bar', '0xa4c13844948d2da4')) 120 self.mqtt = mqtt
121 self.add(makeZbBar(mqtt, 'do-bar', '0xa4c13844948d2da4'))
94 122
95 def add(self, d: Light): 123 def add(self, d: Light):
96 d.notifyChanged = self.notifyChanged 124 d.notifyChanged = self.notifyChanged
97 self._d[d.name] = d 125 self._d[d.name] = d
98 126