view mqtt_metrics.py @ 6:bc2a93b306e9

port over the powermeter measurements
author drewp@bigasterisk.com
date Sat, 10 Aug 2024 23:03:57 -0700
parents 8390d5d0d512
children a640efa9fb01
line wrap: on
line source

import textwrap
import asyncio
import json
import logging
import os
import re
import time
from typing import cast
from weakref import WeakSet

import aiomqtt
import uvicorn  # v 2.0.0
from sse_starlette.sse import EventSourceResponse
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.requests import Request
from starlette_exporter import PrometheusMiddleware, handle_metrics

from convert import converters
from victoriametrics_write import MetricsWriter

logging.basicConfig(level=logging.INFO)
log = logging.getLogger()
logging.getLogger('httpx').setLevel(logging.WARNING)

debugListeners = WeakSet()


def broadcastToDebugListeners(event: dict):
    j = json.dumps(event)
    for lis in debugListeners:
        lis.put_nowait(j)


async def debugEvents(request: Request) -> EventSourceResponse:
    q = asyncio.Queue()
    debugListeners.add(q)

    async def gen():
        try:
            while True:
                yield await q.get()
        except asyncio.CancelledError:
            debugListeners.discard(q)

    resp = EventSourceResponse(gen())
    return resp


def simplifyMqttMessage(mqttMessage: aiomqtt.Message) -> dict:
    message = {
        't': round(time.time(), 3),
        'topic': mqttMessage.topic.value,
        'payload': mqttMessage.payload,
    }
    if isinstance(message['payload'], bytes):
        message['payload'] = message['payload'].decode('utf-8')

    return message


async def requestStatuses(client: aiomqtt.Client, period=10):
    '''shellys post status updates periodically, but I want them more often.'''
    while True:
        for topic in [
                "do-r-power/command/switch:0",
                "ga-fridge-power/command/switch:0",
                "ga-washer-power/command/switch:0",
                "tt-fridge-power/command/switch:0",
                "ws-bench-power/command/switch:0",
                "ws-desk-power/command/switch:0",
                "ws-solder-power/command/switch:0",
        ]:
            await client.publish(topic, 'status_update')
        await asyncio.sleep(period)


async def mqttTask(metrics: MetricsWriter):
    try:
        client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter")
        async with client:
            asyncio.create_task(requestStatuses(client))
            await client.subscribe('#')
            async for mqttMessage in client.messages:
                try:
                    onMqttMessage(metrics, mqttMessage)
                except Exception as e:
                    log.warning("mqtt message (topic %r) failed: %r", mqttMessage.topic.value, textwrap.shorten(repr(e), width=200))
    except Exception:
        log.error("mqtt task failed", exc_info=True)
        os.abort()


def onMqttMessage(metrics, mqttMessage):
    message = simplifyMqttMessage(mqttMessage)

    metricEvents = tryConverters(message)
    for metricEvent in metricEvents:
        metrics.write(message['t'], metricEvent)
        broadcastToDebugListeners({'message': message, 'metricEvent': metricEvent})


def tryConverters(message) -> list[dict]:
    metricEvents: list[dict] = []
    matchedPats = []
    for converter in converters:
        if not (m := re.search(converter.topic_pattern, message['topic'])):
            continue
        if not matchedPats:
            try:
                metricEvents.extend(converter(message, cast(tuple[str], m.groups())))
            except Exception:
                log.error("Error converting mqtt message: topic pattern = %r", converter.topic_pattern, exc_info=True)
            matchedPats.append(converter.topic_pattern)

    if len(matchedPats) > 1:
        log.warning("Multiple patterns matched: %s", ", ".join(matchedPats))

    return metricEvents


def main():
    metrics = MetricsWriter()
    loop = asyncio.new_event_loop()
    app = Starlette(on_startup=[lambda: loop.create_task(mqttTask(metrics))], debug=True, routes=[
        Route('/api/debugEvents', debugEvents),
    ])
    app.add_middleware(PrometheusMiddleware, app_name='mqtt_metrics')
    app.add_route("/metrics", handle_metrics)
    config = uvicorn.Config(app=app, host='0.0.0.0', port=8001, loop='asyncio')
    server = uvicorn.Server(config)
    loop.run_until_complete(server.serve())


main()