Mercurial > code > home > repos > mqtt_metrics
view mqtt_metrics.py @ 10:2a507e679d0d default tip
add zigbee covnerters
author | drewp@bigasterisk.com |
---|---|
date | Mon, 12 Aug 2024 13:15:07 -0700 |
parents | 789042521535 |
children |
line wrap: on
line source
import asyncio import json import logging import os import re import textwrap import time from typing import cast from weakref import WeakSet import aiomqtt import uvicorn # v 2.0.0 from sse_starlette.sse import EventSourceResponse from starlette.applications import Starlette from starlette.requests import Request from starlette.routing import Route from starlette_exporter import PrometheusMiddleware, handle_metrics from convert import converters from victoriametrics_write import MetricsWriter logging.basicConfig(level=logging.INFO) log = logging.getLogger() logging.getLogger('httpx').setLevel(logging.WARNING) debugListeners = WeakSet() def broadcastToDebugListeners(event: dict): j = json.dumps(event) for lis in debugListeners: lis.put_nowait(j) async def debugEvents(request: Request) -> EventSourceResponse: q = asyncio.Queue() debugListeners.add(q) async def gen(): try: while True: yield await q.get() except asyncio.CancelledError: debugListeners.discard(q) resp = EventSourceResponse(gen()) return resp def simplifyMqttMessage(mqttMessage: aiomqtt.Message) -> dict: message = { 't': round(time.time(), 3), 'topic': mqttMessage.topic.value, 'payload': mqttMessage.payload, } if isinstance(message['payload'], bytes): message['payload'] = message['payload'].decode('utf-8') return message async def requestStatuses(client: aiomqtt.Client, period=10): '''shellys post status updates periodically, but I want them more often.''' while True: for topic in [ "do-r-power/command/switch:0", "ga-fridge-power/command/switch:0", "ga-washer-power/command/switch:0", "tt-fridge-power/command/switch:0", "ws-bench-power/command/switch:0", "ws-desk-power/command/switch:0", "ws-solder-power/command/switch:0", ]: await client.publish(topic, 'status_update') await asyncio.sleep(period) async def mqttTask(metrics: MetricsWriter): try: client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter") async with client: asyncio.create_task(requestStatuses(client)) await client.subscribe('#') async for mqttMessage in client.messages: try: onMqttMessage(metrics, mqttMessage) except Exception as e: log.warning("mqtt message (topic %r) failed: %r", mqttMessage.topic.value, textwrap.shorten(repr(e), width=200)) except Exception: log.error("mqtt task failed", exc_info=True) os.abort() def onMqttMessage(metrics, mqttMessage): message = simplifyMqttMessage(mqttMessage) metricEvents = tryConverters(message) for metricEvent in metricEvents: metrics.write(message['t'], metricEvent) broadcastToDebugListeners({'message': message, 'metricEvent': metricEvent}) def tryConverters(message) -> list[dict]: metricEvents: list[dict] = [] matchedPats = [] for converter in converters: try: m = re.search(converter.topic_pattern, message['topic']) except Exception: log.error('re.search', exc_info=True) os.abort() if not m: continue if not matchedPats: try: metricEvents.extend(converter(message, cast(tuple[str], m.groups()))) except Exception: log.error("Error converting mqtt message: topic pattern = %r", converter.topic_pattern, exc_info=True) matchedPats.append(converter.topic_pattern) if len(matchedPats) > 1: log.warning("Multiple patterns matched: %s", ", ".join(matchedPats)) return metricEvents def main(): metrics = MetricsWriter() loop = asyncio.new_event_loop() app = Starlette(on_startup=[lambda: loop.create_task(mqttTask(metrics))], debug=True, routes=[ Route('/api/debugEvents', debugEvents), ]) app.add_middleware(PrometheusMiddleware, app_name='mqtt_metrics') app.add_route("/metrics", handle_metrics) config = uvicorn.Config(app=app, host='0.0.0.0', port=8001, loop='asyncio') server = uvicorn.Server(config) loop.run_until_complete(server.serve()) main()