Mercurial > code > home > repos > mqtt_metrics
comparison mqtt_metrics.py @ 4:cd1b8d7bda78
get metrics writing to victoriametrics
author | drewp@bigasterisk.com |
---|---|
date | Sat, 10 Aug 2024 21:16:19 -0700 |
parents | 41e36f98f3b8 |
children | 8390d5d0d512 |
comparison
equal
deleted
inserted
replaced
3:41e36f98f3b8 | 4:cd1b8d7bda78 |
---|---|
1 import textwrap | |
1 import asyncio | 2 import asyncio |
2 import json | 3 import json |
3 import logging | 4 import logging |
4 import os | 5 import os |
5 import re | 6 import re |
14 from starlette.routing import Route | 15 from starlette.routing import Route |
15 from starlette.requests import Request | 16 from starlette.requests import Request |
16 from starlette_exporter import PrometheusMiddleware, handle_metrics | 17 from starlette_exporter import PrometheusMiddleware, handle_metrics |
17 | 18 |
18 from convert import converters | 19 from convert import converters |
20 from victoriametrics_write import MetricsWriter | |
19 | 21 |
20 logging.basicConfig(level=logging.INFO) | 22 logging.basicConfig(level=logging.INFO) |
21 log = logging.getLogger() | 23 log = logging.getLogger() |
24 logging.getLogger('httpx').setLevel(logging.WARNING) | |
22 | 25 |
23 debugListeners = WeakSet() | 26 debugListeners = WeakSet() |
24 | 27 |
25 | 28 |
26 def broadcastToDebugListeners(event: dict): | 29 def broadcastToDebugListeners(event: dict): |
54 message['payload'] = message['payload'].decode('utf-8') | 57 message['payload'] = message['payload'].decode('utf-8') |
55 | 58 |
56 return message | 59 return message |
57 | 60 |
58 | 61 |
59 async def mqttTask(): | 62 async def mqttTask(metrics: MetricsWriter): |
60 try: | 63 try: |
61 client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter") | 64 client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter") |
62 async with client: | 65 async with client: |
63 await client.subscribe('#') | 66 await client.subscribe('#') |
64 async for mqttMessage in client.messages: | 67 async for mqttMessage in client.messages: |
65 try: | 68 try: |
66 onMqttMessage(mqttMessage) | 69 onMqttMessage(metrics, mqttMessage) |
67 except Exception as e: | 70 except Exception as e: |
68 log.warning("mqtt message (topic %r) failed: %r", mqttMessage.topic.value, e) | 71 log.warning("mqtt message (topic %r) failed: %r", mqttMessage.topic.value, textwrap.shorten(repr(e), width=200)) |
69 | |
70 except Exception: | 72 except Exception: |
71 log.error("mqtt task failed", exc_info=True) | 73 log.error("mqtt task failed", exc_info=True) |
72 os.abort() | 74 os.abort() |
73 | 75 |
74 def onMqttMessage(mqttMessage): | 76 |
77 def onMqttMessage(metrics, mqttMessage): | |
75 message = simplifyMqttMessage(mqttMessage) | 78 message = simplifyMqttMessage(mqttMessage) |
76 | 79 |
77 metricEvent = tryConverters(message) | 80 metricEvent = tryConverters(message) |
78 if metricEvent: | 81 if metricEvent: |
82 metrics.write(message['t'], metricEvent) | |
79 broadcastToDebugListeners({'message': message, 'metricEvent': metricEvent}) | 83 broadcastToDebugListeners({'message': message, 'metricEvent': metricEvent}) |
80 | 84 |
81 | 85 |
82 def tryConverters(message) -> dict | None: | 86 def tryConverters(message) -> dict | None: |
83 metricEvent = None | 87 metricEvent = None |
97 | 101 |
98 return metricEvent | 102 return metricEvent |
99 | 103 |
100 | 104 |
101 def main(): | 105 def main(): |
106 metrics = MetricsWriter() | |
102 loop = asyncio.new_event_loop() | 107 loop = asyncio.new_event_loop() |
103 app = Starlette(on_startup=[lambda: loop.create_task(mqttTask())], debug=True, routes=[ | 108 app = Starlette(on_startup=[lambda: loop.create_task(mqttTask(metrics))], debug=True, routes=[ |
104 Route('/api/debugEvents', debugEvents), | 109 Route('/api/debugEvents', debugEvents), |
105 ]) | 110 ]) |
106 app.add_middleware(PrometheusMiddleware, app_name='mqtt_metrics') | 111 app.add_middleware(PrometheusMiddleware, app_name='mqtt_metrics') |
107 app.add_route("/metrics", handle_metrics) | 112 app.add_route("/metrics", handle_metrics) |
108 config = uvicorn.Config(app=app, host='0.0.0.0', port=8001, loop='asyncio') | 113 config = uvicorn.Config(app=app, host='0.0.0.0', port=8001, loop='asyncio') |