view convert.py @ 9:789042521535

crash on regexp syntax errors
author drewp@bigasterisk.com
date Mon, 12 Aug 2024 13:14:39 -0700
parents bc2a93b306e9
children 2a507e679d0d
line wrap: on
line source

'''
Note that these functions need to parse the message['payload'] into a float
'''
import json

converters = []


def f_from_c(c):
    return (c * 9 / 5) + 32


def topic(topic_pattern: str):

    def decorator(func):
        func.topic_pattern = topic_pattern
        converters.append(func)
        return func

    return decorator


@topic(r'([^-]+)-air-quality/sensor/particulate_matter__10_0__m_concentration/state')
def pm(message, topicGroups: tuple[str]) -> list[dict]:
    return [{
        'name': 'air_quality_pm',
        'labels': [{
            'labelName': 'location',
            'labelValue': topicGroups[0],
        }, {
            'labelName': 'size',
            'labelValue': '10',
        }],
        'value': float(message['payload']),
    }]


@topic(r'([^-]+)-air-quality/sensor/air_temperature_c/state')
def air_temp(message, topicGroups: tuple[str]) -> list[dict]:
    return [{
        'name': 'air_temperature_f',
        'labels': [{
            'labelName': 'location',
            'labelValue': topicGroups[0],
        }],
        'value': f_from_c(float(message['payload'])),
    }]


@topic(r'(tele/st-wall-power/SENSOR|tele/ki-fridge/SENSOR|tt-console-power/SENSOR)')
def poorly_named_sonoff_powermeter(message, topicGroups: tuple[str]) -> list[dict]:
    energy = json.loads(message['payload'])['ENERGY']
    sensor = {
        'tele/st-wall-power/SENSOR': 'st_wall',
        'tele/ki-fridge/SENSOR': 'ki_fridge',
        'tt-console-power/SENSOR': 'tt_console',
    }[topicGroups[0]]
    return [{ 'name': 'powermeter_w',                  'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': energy['Power'] },
            { 'name': 'powermeter_apparent_power_va',  'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': energy['ApparentPower'] },
            { 'name': 'powermeter_reactive_power_var', 'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': energy['ReactivePower'] },
            { 'name': 'powermeter_factor',             'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': energy['Factor'] },
            { 'name': 'powermeter_voltage_v',          'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': energy['Voltage'] },
            { 'name': 'powermeter_current_a',          'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': energy['Current'] } ] # yapf: disable

@topic(r'(.*)-power/status/switch:0')
def shelly_powermeter(message, topicGroups: tuple[str]) -> list[dict]:
    sensor = topicGroups[0].replace('-', '_')
    msg = json.loads(message['payload'])
    return [
        { 'name': 'powermeter_w',                 'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': msg['apower'] },
        { 'name': 'powermeter_voltage_v',         'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': msg['voltage'] },
        { 'name': 'powermeter_current_a',         'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': msg['current'] },
        { 'name': 'powermeter_apparent_power_va', 'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': msg['apower'] },
        { 'name': 'powermeter_temp_f',            'labels': [{'labelName': 'sensor', 'labelValue': sensor}], 'value': f_from_c(msg['temperature']['tC']) },
    ] # yapf: disable