diff mqtt_metrics.py @ 4:cd1b8d7bda78

get metrics writing to victoriametrics
author drewp@bigasterisk.com
date Sat, 10 Aug 2024 21:16:19 -0700
parents 41e36f98f3b8
children 8390d5d0d512
line wrap: on
line diff
--- a/mqtt_metrics.py	Fri Aug 09 18:36:39 2024 -0700
+++ b/mqtt_metrics.py	Sat Aug 10 21:16:19 2024 -0700
@@ -1,3 +1,4 @@
+import textwrap
 import asyncio
 import json
 import logging
@@ -16,9 +17,11 @@
 from starlette_exporter import PrometheusMiddleware, handle_metrics
 
 from convert import converters
+from victoriametrics_write import MetricsWriter
 
 logging.basicConfig(level=logging.INFO)
 log = logging.getLogger()
+logging.getLogger('httpx').setLevel(logging.WARNING)
 
 debugListeners = WeakSet()
 
@@ -56,26 +59,27 @@
     return message
 
 
-async def mqttTask():
+async def mqttTask(metrics: MetricsWriter):
     try:
         client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter")
         async with client:
             await client.subscribe('#')
             async for mqttMessage in client.messages:
                 try:
-                    onMqttMessage(mqttMessage)
+                    onMqttMessage(metrics, mqttMessage)
                 except Exception as e:
-                    log.warning("mqtt message (topic %r) failed: %r", mqttMessage.topic.value, e)
-
+                    log.warning("mqtt message (topic %r) failed: %r", mqttMessage.topic.value, textwrap.shorten(repr(e), width=200))
     except Exception:
         log.error("mqtt task failed", exc_info=True)
         os.abort()
 
-def onMqttMessage(mqttMessage):
+
+def onMqttMessage(metrics, mqttMessage):
     message = simplifyMqttMessage(mqttMessage)
 
     metricEvent = tryConverters(message)
     if metricEvent:
+        metrics.write(message['t'], metricEvent)
         broadcastToDebugListeners({'message': message, 'metricEvent': metricEvent})
 
 
@@ -99,8 +103,9 @@
 
 
 def main():
+    metrics = MetricsWriter()
     loop = asyncio.new_event_loop()
-    app = Starlette(on_startup=[lambda: loop.create_task(mqttTask())], debug=True, routes=[
+    app = Starlette(on_startup=[lambda: loop.create_task(mqttTask(metrics))], debug=True, routes=[
         Route('/api/debugEvents', debugEvents),
     ])
     app.add_middleware(PrometheusMiddleware, app_name='mqtt_metrics')