Mercurial > code > home > repos > mqtt_metrics
comparison mqtt_metrics.py @ 2:579df3a4e62d
rewrite converters as register'able functions
author | drewp@bigasterisk.com |
---|---|
date | Fri, 09 Aug 2024 17:37:00 -0700 |
parents | 3d7f2dc9beec |
children | 41e36f98f3b8 |
comparison
equal
deleted
inserted
replaced
1:3d7f2dc9beec | 2:579df3a4e62d |
---|---|
1 import asyncio | 1 import asyncio |
2 import json | 2 import json |
3 import logging | 3 import logging |
4 import os | 4 import re |
5 import time | 5 import time |
6 from typing import cast | |
6 from weakref import WeakSet | 7 from weakref import WeakSet |
7 | 8 |
8 import aiomqtt | 9 import aiomqtt |
9 import uvicorn # v 2.0.0 | 10 import uvicorn # v 2.0.0 |
10 from sse_starlette.sse import EventSourceResponse | 11 from sse_starlette.sse import EventSourceResponse |
11 from starlette.applications import Starlette | 12 from starlette.applications import Starlette |
13 from starlette.routing import Route | |
12 from starlette.requests import Request | 14 from starlette.requests import Request |
13 from starlette.responses import JSONResponse | |
14 from starlette.routing import Route | |
15 from starlette_exporter import PrometheusMiddleware, handle_metrics | 15 from starlette_exporter import PrometheusMiddleware, handle_metrics |
16 | |
17 from convert import converters | |
16 | 18 |
17 logging.basicConfig(level=logging.INFO) | 19 logging.basicConfig(level=logging.INFO) |
18 log = logging.getLogger() | 20 log = logging.getLogger() |
19 | 21 |
20 debugListeners = WeakSet() | 22 debugListeners = WeakSet() |
21 | 23 |
22 | 24 |
23 def broadcastToDebugListners(event): | 25 def broadcastToDebugListeners(event: dict): |
24 j = json.dumps(event) | 26 j = json.dumps(event) |
25 for lis in debugListeners: | 27 for lis in debugListeners: |
26 lis.put_nowait(j) | 28 lis.put_nowait(j) |
27 | 29 |
28 | 30 |
29 async def debugEvents(request): | 31 async def debugEvents(request: Request) -> EventSourceResponse: |
30 q = asyncio.Queue() | 32 q = asyncio.Queue() |
31 debugListeners.add(q) | 33 debugListeners.add(q) |
32 | 34 |
33 async def gen(): | 35 async def gen(): |
34 try: | 36 try: |
39 | 41 |
40 resp = EventSourceResponse(gen()) | 42 resp = EventSourceResponse(gen()) |
41 return resp | 43 return resp |
42 | 44 |
43 | 45 |
46 def simplifyMqttMessage(mqttMessage: aiomqtt.Message) -> dict: | |
47 message = { | |
48 't': round(time.time(), 3), | |
49 'topic': mqttMessage.topic.value, | |
50 'payload': mqttMessage.payload, | |
51 } | |
52 if isinstance(message['payload'], bytes): | |
53 message['payload'] = message['payload'].decode('utf-8') | |
54 | |
55 return message | |
56 | |
57 | |
44 async def mqttTask(): | 58 async def mqttTask(): |
45 client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter") | 59 client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter") |
46 async with client: | 60 async with client: |
47 await client.subscribe('rr-air-quality/sensor/particulate_matter__10_0__m_concentration/state') | 61 await client.subscribe('#') |
48 async for mqttMessage in client.messages: | 62 async for mqttMessage in client.messages: |
49 message = { | 63 onMqttMessage(mqttMessage) |
50 'topic': mqttMessage.topic.value, | 64 |
51 'payload': mqttMessage.payload, | 65 |
52 't': round(time.time(), 3), | 66 def onMqttMessage(mqttMessage): |
53 } | 67 message = simplifyMqttMessage(mqttMessage) |
54 if isinstance(message['payload'], bytes): | 68 |
55 message['payload'] = message['payload'].decode('utf-8') | 69 metricEvent = tryConverters(message) |
56 metricEvent = { | 70 if metricEvent: |
57 'name': 'mmm', | 71 broadcastToDebugListeners({'message': message, 'metricEvent': metricEvent}) |
58 'labels': [{ | 72 |
59 'labelName': 'x', | 73 |
60 'labelValue': 'y' | 74 def tryConverters(message) -> dict | None: |
61 }], | 75 metricEvent = None |
62 'value': '0', | 76 matchedPats = [] |
63 } | 77 for converter in converters: |
64 print(message, metricEvent) | 78 if not (m := re.search(converter.topic_pattern, message['topic'])): |
65 print("len(debugListeners)", len(debugListeners)) | 79 continue |
66 broadcastToDebugListners({'message': message, 'metricEvent': metricEvent}) | 80 if not matchedPats: |
81 try: | |
82 metricEvent = converter(message, cast(tuple[str], m.groups())) | |
83 except Exception: | |
84 log.error("Error converting mqtt message: topic pattern = %r", converter.topic_pattern, exc_info=True) | |
85 matchedPats.append(converter.topic_pattern) | |
86 | |
87 if len(matchedPats) > 1: | |
88 log.warning("Multiple patterns matched: %s", ", ".join(matchedPats)) | |
89 | |
90 return metricEvent | |
67 | 91 |
68 | 92 |
69 def main(): | 93 def main(): |
70 loop = asyncio.new_event_loop() | 94 loop = asyncio.new_event_loop() |
71 app = Starlette(on_startup=[lambda: loop.create_task(mqttTask())], debug=True, routes=[ | 95 app = Starlette(on_startup=[lambda: loop.create_task(mqttTask())], debug=True, routes=[ |