Mercurial > code > home > repos > mqtt_metrics
annotate mqtt_metrics.py @ 3:41e36f98f3b8
die on mqtt connection errors
author | drewp@bigasterisk.com |
---|---|
date | Fri, 09 Aug 2024 18:36:39 -0700 |
parents | 579df3a4e62d |
children | cd1b8d7bda78 |
rev | line source |
---|---|
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
1 import asyncio |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
2 import json |
0
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
3 import logging |
3 | 4 import os |
2
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
5 import re |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
6 import time |
2
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
7 from typing import cast |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
8 from weakref import WeakSet |
0
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
9 |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
10 import aiomqtt |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
11 import uvicorn # v 2.0.0 |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
12 from sse_starlette.sse import EventSourceResponse |
0
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
13 from starlette.applications import Starlette |
2
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
14 from starlette.routing import Route |
0
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
15 from starlette.requests import Request |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
16 from starlette_exporter import PrometheusMiddleware, handle_metrics |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
17 |
2
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
18 from convert import converters |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
19 |
0
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
20 logging.basicConfig(level=logging.INFO) |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
21 log = logging.getLogger() |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
22 |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
23 debugListeners = WeakSet() |
0
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
24 |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
25 |
2
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
26 def broadcastToDebugListeners(event: dict): |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
27 j = json.dumps(event) |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
28 for lis in debugListeners: |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
29 lis.put_nowait(j) |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
30 |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
31 |
2
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
32 async def debugEvents(request: Request) -> EventSourceResponse: |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
33 q = asyncio.Queue() |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
34 debugListeners.add(q) |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
35 |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
36 async def gen(): |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
37 try: |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
38 while True: |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
39 yield await q.get() |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
40 except asyncio.CancelledError: |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
41 debugListeners.discard(q) |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
42 |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
43 resp = EventSourceResponse(gen()) |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
44 return resp |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
45 |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
46 |
2
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
47 def simplifyMqttMessage(mqttMessage: aiomqtt.Message) -> dict: |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
48 message = { |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
49 't': round(time.time(), 3), |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
50 'topic': mqttMessage.topic.value, |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
51 'payload': mqttMessage.payload, |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
52 } |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
53 if isinstance(message['payload'], bytes): |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
54 message['payload'] = message['payload'].decode('utf-8') |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
55 |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
56 return message |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
57 |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
58 |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
59 async def mqttTask(): |
3 | 60 try: |
61 client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter") | |
62 async with client: | |
63 await client.subscribe('#') | |
64 async for mqttMessage in client.messages: | |
65 try: | |
66 onMqttMessage(mqttMessage) | |
67 except Exception as e: | |
68 log.warning("mqtt message (topic %r) failed: %r", mqttMessage.topic.value, e) | |
2
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
69 |
3 | 70 except Exception: |
71 log.error("mqtt task failed", exc_info=True) | |
72 os.abort() | |
2
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
73 |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
74 def onMqttMessage(mqttMessage): |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
75 message = simplifyMqttMessage(mqttMessage) |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
76 |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
77 metricEvent = tryConverters(message) |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
78 if metricEvent: |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
79 broadcastToDebugListeners({'message': message, 'metricEvent': metricEvent}) |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
80 |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
81 |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
82 def tryConverters(message) -> dict | None: |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
83 metricEvent = None |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
84 matchedPats = [] |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
85 for converter in converters: |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
86 if not (m := re.search(converter.topic_pattern, message['topic'])): |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
87 continue |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
88 if not matchedPats: |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
89 try: |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
90 metricEvent = converter(message, cast(tuple[str], m.groups())) |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
91 except Exception: |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
92 log.error("Error converting mqtt message: topic pattern = %r", converter.topic_pattern, exc_info=True) |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
93 matchedPats.append(converter.topic_pattern) |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
94 |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
95 if len(matchedPats) > 1: |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
96 log.warning("Multiple patterns matched: %s", ", ".join(matchedPats)) |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
97 |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
98 return metricEvent |
0
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
99 |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
100 |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
101 def main(): |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
102 loop = asyncio.new_event_loop() |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
103 app = Starlette(on_startup=[lambda: loop.create_task(mqttTask())], debug=True, routes=[ |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
104 Route('/api/debugEvents', debugEvents), |
0
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
105 ]) |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
106 app.add_middleware(PrometheusMiddleware, app_name='mqtt_metrics') |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
107 app.add_route("/metrics", handle_metrics) |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
108 config = uvicorn.Config(app=app, host='0.0.0.0', port=8001, loop='asyncio') |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
109 server = uvicorn.Server(config) |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
110 loop.run_until_complete(server.serve()) |
0
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
111 |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
112 |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
113 main() |