Mercurial > code > home > repos > mqtt_metrics
changeset 2:579df3a4e62d
rewrite converters as register'able functions
author | drewp@bigasterisk.com |
---|---|
date | Fri, 09 Aug 2024 17:37:00 -0700 |
parents | 3d7f2dc9beec |
children | 41e36f98f3b8 |
files | convert.py mqtt_metrics.py src/main.ts |
diffstat | 3 files changed, 87 insertions(+), 25 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/convert.py Fri Aug 09 17:37:00 2024 -0700 @@ -0,0 +1,38 @@ +converters = [] + + +def topic(topic_pattern: str): + + def decorator(func): + func.topic_pattern = topic_pattern + converters.append(func) + return func + + return decorator + + +@topic(r'([^-]+)-air-quality/sensor/particulate_matter__10_0__m_concentration/state') +def pm(message, topicGroups: tuple[str]): + return { + 'name': 'air_quality_pm', + 'labels': [{ + 'labelName': 'location', + 'labelValue': topicGroups[0], + }, { + 'labelName': 'size', + 'labelValue': '10', + }], + 'value': message['payload'], + } + + +@topic(r'([^-]+)-air-quality/sensor/air_temperature_c/state') +def air_temp(message, topicGroups: tuple[str]): + return { + 'name': 'air_temperature_f', + 'labels': [{ + 'labelName': 'location', + 'labelValue': topicGroups[0], + }], + 'value': (float(message['payload']) * 9 / 5) + 32, + }
--- a/mqtt_metrics.py Fri Aug 09 16:59:06 2024 -0700 +++ b/mqtt_metrics.py Fri Aug 09 17:37:00 2024 -0700 @@ -1,32 +1,34 @@ import asyncio import json import logging -import os +import re import time +from typing import cast from weakref import WeakSet import aiomqtt import uvicorn # v 2.0.0 from sse_starlette.sse import EventSourceResponse from starlette.applications import Starlette +from starlette.routing import Route from starlette.requests import Request -from starlette.responses import JSONResponse -from starlette.routing import Route from starlette_exporter import PrometheusMiddleware, handle_metrics +from convert import converters + logging.basicConfig(level=logging.INFO) log = logging.getLogger() debugListeners = WeakSet() -def broadcastToDebugListners(event): +def broadcastToDebugListeners(event: dict): j = json.dumps(event) for lis in debugListeners: lis.put_nowait(j) -async def debugEvents(request): +async def debugEvents(request: Request) -> EventSourceResponse: q = asyncio.Queue() debugListeners.add(q) @@ -41,29 +43,51 @@ return resp +def simplifyMqttMessage(mqttMessage: aiomqtt.Message) -> dict: + message = { + 't': round(time.time(), 3), + 'topic': mqttMessage.topic.value, + 'payload': mqttMessage.payload, + } + if isinstance(message['payload'], bytes): + message['payload'] = message['payload'].decode('utf-8') + + return message + + async def mqttTask(): client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter") async with client: - await client.subscribe('rr-air-quality/sensor/particulate_matter__10_0__m_concentration/state') + await client.subscribe('#') async for mqttMessage in client.messages: - message = { - 'topic': mqttMessage.topic.value, - 'payload': mqttMessage.payload, - 't': round(time.time(), 3), - } - if isinstance(message['payload'], bytes): - message['payload'] = message['payload'].decode('utf-8') - metricEvent = { - 'name': 'mmm', - 'labels': [{ - 'labelName': 'x', - 'labelValue': 'y' - }], - 'value': '0', - } - print(message, metricEvent) - print("len(debugListeners)", len(debugListeners)) - broadcastToDebugListners({'message': message, 'metricEvent': metricEvent}) + onMqttMessage(mqttMessage) + + +def onMqttMessage(mqttMessage): + message = simplifyMqttMessage(mqttMessage) + + metricEvent = tryConverters(message) + if metricEvent: + broadcastToDebugListeners({'message': message, 'metricEvent': metricEvent}) + + +def tryConverters(message) -> dict | None: + metricEvent = None + matchedPats = [] + for converter in converters: + if not (m := re.search(converter.topic_pattern, message['topic'])): + continue + if not matchedPats: + try: + metricEvent = converter(message, cast(tuple[str], m.groups())) + except Exception: + log.error("Error converting mqtt message: topic pattern = %r", converter.topic_pattern, exc_info=True) + matchedPats.append(converter.topic_pattern) + + if len(matchedPats) > 1: + log.warning("Multiple patterns matched: %s", ", ".join(matchedPats)) + + return metricEvent def main():
--- a/src/main.ts Fri Aug 09 16:59:06 2024 -0700 +++ b/src/main.ts Fri Aug 09 17:37:00 2024 -0700 @@ -98,7 +98,7 @@ <span class="msgKey">t</span>=<span class="msgValue"> ${t.toLocaleString("sv")} </span> <span class="msgKey">topic</span>=<span class="msgValue" >${ev.message.topic}</span > - <span class="msgKey">message</span>=<span class="msgValue">${ev.message.payload}</span> + <span class="msgKey">payload</span>=<span class="msgValue">${ev.message.payload}</span> </div> <div> Converted to metric: