comparison mqtt_metrics.py @ 4:cd1b8d7bda78

get metrics writing to victoriametrics
author drewp@bigasterisk.com
date Sat, 10 Aug 2024 21:16:19 -0700
parents 41e36f98f3b8
children 8390d5d0d512
comparison
equal deleted inserted replaced
3:41e36f98f3b8 4:cd1b8d7bda78
1 import textwrap
1 import asyncio 2 import asyncio
2 import json 3 import json
3 import logging 4 import logging
4 import os 5 import os
5 import re 6 import re
14 from starlette.routing import Route 15 from starlette.routing import Route
15 from starlette.requests import Request 16 from starlette.requests import Request
16 from starlette_exporter import PrometheusMiddleware, handle_metrics 17 from starlette_exporter import PrometheusMiddleware, handle_metrics
17 18
18 from convert import converters 19 from convert import converters
20 from victoriametrics_write import MetricsWriter
19 21
20 logging.basicConfig(level=logging.INFO) 22 logging.basicConfig(level=logging.INFO)
21 log = logging.getLogger() 23 log = logging.getLogger()
24 logging.getLogger('httpx').setLevel(logging.WARNING)
22 25
23 debugListeners = WeakSet() 26 debugListeners = WeakSet()
24 27
25 28
26 def broadcastToDebugListeners(event: dict): 29 def broadcastToDebugListeners(event: dict):
54 message['payload'] = message['payload'].decode('utf-8') 57 message['payload'] = message['payload'].decode('utf-8')
55 58
56 return message 59 return message
57 60
58 61
59 async def mqttTask(): 62 async def mqttTask(metrics: MetricsWriter):
60 try: 63 try:
61 client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter") 64 client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter")
62 async with client: 65 async with client:
63 await client.subscribe('#') 66 await client.subscribe('#')
64 async for mqttMessage in client.messages: 67 async for mqttMessage in client.messages:
65 try: 68 try:
66 onMqttMessage(mqttMessage) 69 onMqttMessage(metrics, mqttMessage)
67 except Exception as e: 70 except Exception as e:
68 log.warning("mqtt message (topic %r) failed: %r", mqttMessage.topic.value, e) 71 log.warning("mqtt message (topic %r) failed: %r", mqttMessage.topic.value, textwrap.shorten(repr(e), width=200))
69
70 except Exception: 72 except Exception:
71 log.error("mqtt task failed", exc_info=True) 73 log.error("mqtt task failed", exc_info=True)
72 os.abort() 74 os.abort()
73 75
74 def onMqttMessage(mqttMessage): 76
77 def onMqttMessage(metrics, mqttMessage):
75 message = simplifyMqttMessage(mqttMessage) 78 message = simplifyMqttMessage(mqttMessage)
76 79
77 metricEvent = tryConverters(message) 80 metricEvent = tryConverters(message)
78 if metricEvent: 81 if metricEvent:
82 metrics.write(message['t'], metricEvent)
79 broadcastToDebugListeners({'message': message, 'metricEvent': metricEvent}) 83 broadcastToDebugListeners({'message': message, 'metricEvent': metricEvent})
80 84
81 85
82 def tryConverters(message) -> dict | None: 86 def tryConverters(message) -> dict | None:
83 metricEvent = None 87 metricEvent = None
97 101
98 return metricEvent 102 return metricEvent
99 103
100 104
101 def main(): 105 def main():
106 metrics = MetricsWriter()
102 loop = asyncio.new_event_loop() 107 loop = asyncio.new_event_loop()
103 app = Starlette(on_startup=[lambda: loop.create_task(mqttTask())], debug=True, routes=[ 108 app = Starlette(on_startup=[lambda: loop.create_task(mqttTask(metrics))], debug=True, routes=[
104 Route('/api/debugEvents', debugEvents), 109 Route('/api/debugEvents', debugEvents),
105 ]) 110 ])
106 app.add_middleware(PrometheusMiddleware, app_name='mqtt_metrics') 111 app.add_middleware(PrometheusMiddleware, app_name='mqtt_metrics')
107 app.add_route("/metrics", handle_metrics) 112 app.add_route("/metrics", handle_metrics)
108 config = uvicorn.Config(app=app, host='0.0.0.0', port=8001, loop='asyncio') 113 config = uvicorn.Config(app=app, host='0.0.0.0', port=8001, loop='asyncio')