annotate mqtt_metrics.py @ 3:41e36f98f3b8

die on mqtt connection errors
author drewp@bigasterisk.com
date Fri, 09 Aug 2024 18:36:39 -0700
parents 579df3a4e62d
children cd1b8d7bda78
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
1
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
1 import asyncio
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
2 import json
0
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
3 import logging
3
41e36f98f3b8 die on mqtt connection errors
drewp@bigasterisk.com
parents: 2
diff changeset
4 import os
2
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
5 import re
1
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
6 import time
2
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
7 from typing import cast
1
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
8 from weakref import WeakSet
0
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
9
1
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
10 import aiomqtt
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
11 import uvicorn # v 2.0.0
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
12 from sse_starlette.sse import EventSourceResponse
0
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
13 from starlette.applications import Starlette
2
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
14 from starlette.routing import Route
0
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
15 from starlette.requests import Request
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
16 from starlette_exporter import PrometheusMiddleware, handle_metrics
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
17
2
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
18 from convert import converters
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
19
0
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
20 logging.basicConfig(level=logging.INFO)
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
21 log = logging.getLogger()
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
22
1
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
23 debugListeners = WeakSet()
0
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
24
1
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
25
2
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
26 def broadcastToDebugListeners(event: dict):
1
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
27 j = json.dumps(event)
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
28 for lis in debugListeners:
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
29 lis.put_nowait(j)
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
30
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
31
2
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
32 async def debugEvents(request: Request) -> EventSourceResponse:
1
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
33 q = asyncio.Queue()
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
34 debugListeners.add(q)
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
35
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
36 async def gen():
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
37 try:
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
38 while True:
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
39 yield await q.get()
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
40 except asyncio.CancelledError:
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
41 debugListeners.discard(q)
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
42
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
43 resp = EventSourceResponse(gen())
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
44 return resp
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
45
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
46
2
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
47 def simplifyMqttMessage(mqttMessage: aiomqtt.Message) -> dict:
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
48 message = {
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
49 't': round(time.time(), 3),
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
50 'topic': mqttMessage.topic.value,
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
51 'payload': mqttMessage.payload,
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
52 }
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
53 if isinstance(message['payload'], bytes):
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
54 message['payload'] = message['payload'].decode('utf-8')
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
55
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
56 return message
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
57
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
58
1
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
59 async def mqttTask():
3
41e36f98f3b8 die on mqtt connection errors
drewp@bigasterisk.com
parents: 2
diff changeset
60 try:
41e36f98f3b8 die on mqtt connection errors
drewp@bigasterisk.com
parents: 2
diff changeset
61 client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter")
41e36f98f3b8 die on mqtt connection errors
drewp@bigasterisk.com
parents: 2
diff changeset
62 async with client:
41e36f98f3b8 die on mqtt connection errors
drewp@bigasterisk.com
parents: 2
diff changeset
63 await client.subscribe('#')
41e36f98f3b8 die on mqtt connection errors
drewp@bigasterisk.com
parents: 2
diff changeset
64 async for mqttMessage in client.messages:
41e36f98f3b8 die on mqtt connection errors
drewp@bigasterisk.com
parents: 2
diff changeset
65 try:
41e36f98f3b8 die on mqtt connection errors
drewp@bigasterisk.com
parents: 2
diff changeset
66 onMqttMessage(mqttMessage)
41e36f98f3b8 die on mqtt connection errors
drewp@bigasterisk.com
parents: 2
diff changeset
67 except Exception as e:
41e36f98f3b8 die on mqtt connection errors
drewp@bigasterisk.com
parents: 2
diff changeset
68 log.warning("mqtt message (topic %r) failed: %r", mqttMessage.topic.value, e)
2
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
69
3
41e36f98f3b8 die on mqtt connection errors
drewp@bigasterisk.com
parents: 2
diff changeset
70 except Exception:
41e36f98f3b8 die on mqtt connection errors
drewp@bigasterisk.com
parents: 2
diff changeset
71 log.error("mqtt task failed", exc_info=True)
41e36f98f3b8 die on mqtt connection errors
drewp@bigasterisk.com
parents: 2
diff changeset
72 os.abort()
2
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
73
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
74 def onMqttMessage(mqttMessage):
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
75 message = simplifyMqttMessage(mqttMessage)
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
76
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
77 metricEvent = tryConverters(message)
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
78 if metricEvent:
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
79 broadcastToDebugListeners({'message': message, 'metricEvent': metricEvent})
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
80
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
81
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
82 def tryConverters(message) -> dict | None:
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
83 metricEvent = None
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
84 matchedPats = []
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
85 for converter in converters:
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
86 if not (m := re.search(converter.topic_pattern, message['topic'])):
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
87 continue
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
88 if not matchedPats:
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
89 try:
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
90 metricEvent = converter(message, cast(tuple[str], m.groups()))
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
91 except Exception:
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
92 log.error("Error converting mqtt message: topic pattern = %r", converter.topic_pattern, exc_info=True)
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
93 matchedPats.append(converter.topic_pattern)
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
94
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
95 if len(matchedPats) > 1:
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
96 log.warning("Multiple patterns matched: %s", ", ".join(matchedPats))
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
97
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
98 return metricEvent
0
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
99
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
100
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
101 def main():
1
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
102 loop = asyncio.new_event_loop()
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
103 app = Starlette(on_startup=[lambda: loop.create_task(mqttTask())], debug=True, routes=[
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
104 Route('/api/debugEvents', debugEvents),
0
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
105 ])
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
106 app.add_middleware(PrometheusMiddleware, app_name='mqtt_metrics')
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
107 app.add_route("/metrics", handle_metrics)
1
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
108 config = uvicorn.Config(app=app, host='0.0.0.0', port=8001, loop='asyncio')
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
109 server = uvicorn.Server(config)
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
110 loop.run_until_complete(server.serve())
0
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
111
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
112
1
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
113 main()