Mercurial > code > home > repos > mqtt_metrics
diff mqtt_metrics.py @ 4:cd1b8d7bda78
get metrics writing to victoriametrics
author | drewp@bigasterisk.com |
---|---|
date | Sat, 10 Aug 2024 21:16:19 -0700 |
parents | 41e36f98f3b8 |
children | 8390d5d0d512 |
line wrap: on
line diff
--- a/mqtt_metrics.py Fri Aug 09 18:36:39 2024 -0700 +++ b/mqtt_metrics.py Sat Aug 10 21:16:19 2024 -0700 @@ -1,3 +1,4 @@ +import textwrap import asyncio import json import logging @@ -16,9 +17,11 @@ from starlette_exporter import PrometheusMiddleware, handle_metrics from convert import converters +from victoriametrics_write import MetricsWriter logging.basicConfig(level=logging.INFO) log = logging.getLogger() +logging.getLogger('httpx').setLevel(logging.WARNING) debugListeners = WeakSet() @@ -56,26 +59,27 @@ return message -async def mqttTask(): +async def mqttTask(metrics: MetricsWriter): try: client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter") async with client: await client.subscribe('#') async for mqttMessage in client.messages: try: - onMqttMessage(mqttMessage) + onMqttMessage(metrics, mqttMessage) except Exception as e: - log.warning("mqtt message (topic %r) failed: %r", mqttMessage.topic.value, e) - + log.warning("mqtt message (topic %r) failed: %r", mqttMessage.topic.value, textwrap.shorten(repr(e), width=200)) except Exception: log.error("mqtt task failed", exc_info=True) os.abort() -def onMqttMessage(mqttMessage): + +def onMqttMessage(metrics, mqttMessage): message = simplifyMqttMessage(mqttMessage) metricEvent = tryConverters(message) if metricEvent: + metrics.write(message['t'], metricEvent) broadcastToDebugListeners({'message': message, 'metricEvent': metricEvent}) @@ -99,8 +103,9 @@ def main(): + metrics = MetricsWriter() loop = asyncio.new_event_loop() - app = Starlette(on_startup=[lambda: loop.create_task(mqttTask())], debug=True, routes=[ + app = Starlette(on_startup=[lambda: loop.create_task(mqttTask(metrics))], debug=True, routes=[ Route('/api/debugEvents', debugEvents), ]) app.add_middleware(PrometheusMiddleware, app_name='mqtt_metrics')