Mercurial > code > home > repos > mqtt_metrics
view convert.py @ 9:789042521535
crash on regexp syntax errors
author | drewp@bigasterisk.com |
---|---|
date | Mon, 12 Aug 2024 13:14:39 -0700 |
parents | bc2a93b306e9 |
children | 2a507e679d0d |
line wrap: on
line source
''' Note that these functions need to parse the message['payload'] into a float ''' import json converters = [] def f_from_c(c): return (c * 9 / 5) + 32 def topic(topic_pattern: str): def decorator(func): func.topic_pattern = topic_pattern converters.append(func) return func return decorator @topic(r'([^-]+)-air-quality/sensor/particulate_matter__10_0__m_concentration/state') def pm(message, topicGroups: tuple[str]) -> list[dict]: return [{ 'name': 'air_quality_pm', 'labels': [{ 'labelName': 'location', 'labelValue': topicGroups[0], }, { 'labelName': 'size', 'labelValue': '10', }], 'value': float(message['payload']), }] @topic(r'([^-]+)-air-quality/sensor/air_temperature_c/state') def air_temp(message, topicGroups: tuple[str]) -> list[dict]: return [{ 'name': 'air_temperature_f', 'labels': [{ 'labelName': 'location', 'labelValue': topicGroups[0], }], 'value': f_from_c(float(message['payload'])), }] @topic(r'(tele/st-wall-power/SENSOR|tele/ki-fridge/SENSOR|tt-console-power/SENSOR)') def poorly_named_sonoff_powermeter(message, topicGroups: tuple[str]) -> list[dict]: energy = json.loads(message['payload'])['ENERGY'] sensor = { 'tele/st-wall-power/SENSOR': 'st_wall', 'tele/ki-fridge/SENSOR': 'ki_fridge', 'tt-console-power/SENSOR': 'tt_console', }[topicGroups[0]] return [{ 'name': 'powermeter_w', 'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': energy['Power'] }, { 'name': 'powermeter_apparent_power_va', 'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': energy['ApparentPower'] }, { 'name': 'powermeter_reactive_power_var', 'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': energy['ReactivePower'] }, { 'name': 'powermeter_factor', 'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': energy['Factor'] }, { 'name': 'powermeter_voltage_v', 'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': energy['Voltage'] }, { 'name': 'powermeter_current_a', 'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': energy['Current'] } ] # yapf: disable @topic(r'(.*)-power/status/switch:0') def shelly_powermeter(message, topicGroups: tuple[str]) -> list[dict]: sensor = topicGroups[0].replace('-', '_') msg = json.loads(message['payload']) return [ { 'name': 'powermeter_w', 'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': msg['apower'] }, { 'name': 'powermeter_voltage_v', 'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': msg['voltage'] }, { 'name': 'powermeter_current_a', 'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': msg['current'] }, { 'name': 'powermeter_apparent_power_va', 'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': msg['apower'] }, { 'name': 'powermeter_temp_f', 'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': f_from_c(msg['temperature']['tC']) }, ] # yapf: disable