comparison mqtt_metrics.py @ 2:579df3a4e62d

rewrite converters as register'able functions
author drewp@bigasterisk.com
date Fri, 09 Aug 2024 17:37:00 -0700
parents 3d7f2dc9beec
children 41e36f98f3b8
comparison
equal deleted inserted replaced
1:3d7f2dc9beec 2:579df3a4e62d
1 import asyncio 1 import asyncio
2 import json 2 import json
3 import logging 3 import logging
4 import os 4 import re
5 import time 5 import time
6 from typing import cast
6 from weakref import WeakSet 7 from weakref import WeakSet
7 8
8 import aiomqtt 9 import aiomqtt
9 import uvicorn # v 2.0.0 10 import uvicorn # v 2.0.0
10 from sse_starlette.sse import EventSourceResponse 11 from sse_starlette.sse import EventSourceResponse
11 from starlette.applications import Starlette 12 from starlette.applications import Starlette
13 from starlette.routing import Route
12 from starlette.requests import Request 14 from starlette.requests import Request
13 from starlette.responses import JSONResponse
14 from starlette.routing import Route
15 from starlette_exporter import PrometheusMiddleware, handle_metrics 15 from starlette_exporter import PrometheusMiddleware, handle_metrics
16
17 from convert import converters
16 18
17 logging.basicConfig(level=logging.INFO) 19 logging.basicConfig(level=logging.INFO)
18 log = logging.getLogger() 20 log = logging.getLogger()
19 21
20 debugListeners = WeakSet() 22 debugListeners = WeakSet()
21 23
22 24
23 def broadcastToDebugListners(event): 25 def broadcastToDebugListeners(event: dict):
24 j = json.dumps(event) 26 j = json.dumps(event)
25 for lis in debugListeners: 27 for lis in debugListeners:
26 lis.put_nowait(j) 28 lis.put_nowait(j)
27 29
28 30
29 async def debugEvents(request): 31 async def debugEvents(request: Request) -> EventSourceResponse:
30 q = asyncio.Queue() 32 q = asyncio.Queue()
31 debugListeners.add(q) 33 debugListeners.add(q)
32 34
33 async def gen(): 35 async def gen():
34 try: 36 try:
39 41
40 resp = EventSourceResponse(gen()) 42 resp = EventSourceResponse(gen())
41 return resp 43 return resp
42 44
43 45
46 def simplifyMqttMessage(mqttMessage: aiomqtt.Message) -> dict:
47 message = {
48 't': round(time.time(), 3),
49 'topic': mqttMessage.topic.value,
50 'payload': mqttMessage.payload,
51 }
52 if isinstance(message['payload'], bytes):
53 message['payload'] = message['payload'].decode('utf-8')
54
55 return message
56
57
44 async def mqttTask(): 58 async def mqttTask():
45 client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter") 59 client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter")
46 async with client: 60 async with client:
47 await client.subscribe('rr-air-quality/sensor/particulate_matter__10_0__m_concentration/state') 61 await client.subscribe('#')
48 async for mqttMessage in client.messages: 62 async for mqttMessage in client.messages:
49 message = { 63 onMqttMessage(mqttMessage)
50 'topic': mqttMessage.topic.value, 64
51 'payload': mqttMessage.payload, 65
52 't': round(time.time(), 3), 66 def onMqttMessage(mqttMessage):
53 } 67 message = simplifyMqttMessage(mqttMessage)
54 if isinstance(message['payload'], bytes): 68
55 message['payload'] = message['payload'].decode('utf-8') 69 metricEvent = tryConverters(message)
56 metricEvent = { 70 if metricEvent:
57 'name': 'mmm', 71 broadcastToDebugListeners({'message': message, 'metricEvent': metricEvent})
58 'labels': [{ 72
59 'labelName': 'x', 73
60 'labelValue': 'y' 74 def tryConverters(message) -> dict | None:
61 }], 75 metricEvent = None
62 'value': '0', 76 matchedPats = []
63 } 77 for converter in converters:
64 print(message, metricEvent) 78 if not (m := re.search(converter.topic_pattern, message['topic'])):
65 print("len(debugListeners)", len(debugListeners)) 79 continue
66 broadcastToDebugListners({'message': message, 'metricEvent': metricEvent}) 80 if not matchedPats:
81 try:
82 metricEvent = converter(message, cast(tuple[str], m.groups()))
83 except Exception:
84 log.error("Error converting mqtt message: topic pattern = %r", converter.topic_pattern, exc_info=True)
85 matchedPats.append(converter.topic_pattern)
86
87 if len(matchedPats) > 1:
88 log.warning("Multiple patterns matched: %s", ", ".join(matchedPats))
89
90 return metricEvent
67 91
68 92
69 def main(): 93 def main():
70 loop = asyncio.new_event_loop() 94 loop = asyncio.new_event_loop()
71 app = Starlette(on_startup=[lambda: loop.create_task(mqttTask())], debug=True, routes=[ 95 app = Starlette(on_startup=[lambda: loop.create_task(mqttTask())], debug=True, routes=[