comparison convert.py @ 10:2a507e679d0d default tip

add zigbee covnerters
author drewp@bigasterisk.com
date Mon, 12 Aug 2024 13:15:07 -0700
parents bc2a93b306e9
children
comparison
equal deleted inserted replaced
9:789042521535 10:2a507e679d0d
1 ''' 1 '''
2 Note that these functions need to parse the message['payload'] into a float 2 Note that these functions need to parse the message['payload'] into a float
3 ''' 3 '''
4 import json 4 import json
5 import logging
5 6
7 log = logging.getLogger('conv')
6 converters = [] 8 converters = []
7 9
8 10
9 def f_from_c(c): 11 def f_from_c(c):
10 return (c * 9 / 5) + 32 12 return (c * 9 / 5) + 32
71 { 'name': 'powermeter_voltage_v', 'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': msg['voltage'] }, 73 { 'name': 'powermeter_voltage_v', 'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': msg['voltage'] },
72 { 'name': 'powermeter_current_a', 'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': msg['current'] }, 74 { 'name': 'powermeter_current_a', 'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': msg['current'] },
73 { 'name': 'powermeter_apparent_power_va', 'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': msg['apower'] }, 75 { 'name': 'powermeter_apparent_power_va', 'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': msg['apower'] },
74 { 'name': 'powermeter_temp_f', 'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': f_from_c(msg['temperature']['tC']) }, 76 { 'name': 'powermeter_temp_f', 'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': f_from_c(msg['temperature']['tC']) },
75 ] # yapf: disable 77 ] # yapf: disable
78
79 @topic(r'^zigbee/([^/]+)$')
80 def zigbee(message, topicGroups: tuple[str]) -> list[dict]:
81 dev = topicGroups[0]
82 body = json.loads(message['payload'])
83 ret = []
84 for v in ['update_available', 'link_quality', 'battery', 'occupancy', 'brightness', 'contact']:
85 body_key = 'linkquality' if v == 'link_quality' else v
86 if body_key not in body:
87 continue
88 value = body[body_key]
89 if v in ['update_available', 'contact', 'occupancy']:
90 value = int(bool(value))
91 if v == 'brightness' and body.get('state', None) != 'ON':
92 value = 0
93 if v == 'link_quality' and value is None:
94 value = -1
95 if value is None:
96 logging.warning(f'zigbee {v=} {value=}')
97 continue
98 ret.append({'name': 'zigbee_' + v, 'labels': [{'labelName': 'dev', 'labelValue': dev}], 'value': value})
99 return ret
100
101
102 @topic(r'^zigbee/([^/]+)/availability$')
103 def zigbee_availability(message, topicGroups: tuple[str]) -> list[dict]:
104 dev = topicGroups[0]
105 online = message['payload'] == "online"
106 ret = [{'name': 'zigbee_online', 'labels': [{'labelName': 'dev', 'labelValue': dev}], 'value': int(online)}]
107 if online and dev.startswith('0x'):
108 ret.append({'name': 'zigbee_unnamed_dev', 'labels': [{'labelName': 'dev', 'labelValue': dev}], 'value': 1})
109 return ret