annotate mqtt_metrics.py @ 9:789042521535

crash on regexp syntax errors
author drewp@bigasterisk.com
date Mon, 12 Aug 2024 13:14:39 -0700
parents 82cec89e6534
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
1
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
1 import asyncio
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
2 import json
0
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
3 import logging
3
41e36f98f3b8 die on mqtt connection errors
drewp@bigasterisk.com
parents: 2
diff changeset
4 import os
2
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
5 import re
7
a640efa9fb01 logging
drewp@bigasterisk.com
parents: 6
diff changeset
6 import textwrap
1
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
7 import time
2
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
8 from typing import cast
1
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
9 from weakref import WeakSet
0
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
10
1
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
11 import aiomqtt
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
12 import uvicorn # v 2.0.0
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
13 from sse_starlette.sse import EventSourceResponse
0
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
14 from starlette.applications import Starlette
7
a640efa9fb01 logging
drewp@bigasterisk.com
parents: 6
diff changeset
15 from starlette.requests import Request
2
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
16 from starlette.routing import Route
0
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
17 from starlette_exporter import PrometheusMiddleware, handle_metrics
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
18
2
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
19 from convert import converters
4
cd1b8d7bda78 get metrics writing to victoriametrics
drewp@bigasterisk.com
parents: 3
diff changeset
20 from victoriametrics_write import MetricsWriter
2
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
21
0
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
22 logging.basicConfig(level=logging.INFO)
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
23 log = logging.getLogger()
4
cd1b8d7bda78 get metrics writing to victoriametrics
drewp@bigasterisk.com
parents: 3
diff changeset
24 logging.getLogger('httpx').setLevel(logging.WARNING)
0
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
25
1
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
26 debugListeners = WeakSet()
0
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
27
1
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
28
2
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
29 def broadcastToDebugListeners(event: dict):
1
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
30 j = json.dumps(event)
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
31 for lis in debugListeners:
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
32 lis.put_nowait(j)
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
33
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
34
2
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
35 async def debugEvents(request: Request) -> EventSourceResponse:
1
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
36 q = asyncio.Queue()
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
37 debugListeners.add(q)
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
38
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
39 async def gen():
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
40 try:
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
41 while True:
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
42 yield await q.get()
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
43 except asyncio.CancelledError:
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
44 debugListeners.discard(q)
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
45
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
46 resp = EventSourceResponse(gen())
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
47 return resp
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
48
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
49
2
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
50 def simplifyMqttMessage(mqttMessage: aiomqtt.Message) -> dict:
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
51 message = {
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
52 't': round(time.time(), 3),
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
53 'topic': mqttMessage.topic.value,
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
54 'payload': mqttMessage.payload,
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
55 }
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
56 if isinstance(message['payload'], bytes):
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
57 message['payload'] = message['payload'].decode('utf-8')
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
58
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
59 return message
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
60
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
61
6
bc2a93b306e9 port over the powermeter measurements
drewp@bigasterisk.com
parents: 5
diff changeset
62 async def requestStatuses(client: aiomqtt.Client, period=10):
bc2a93b306e9 port over the powermeter measurements
drewp@bigasterisk.com
parents: 5
diff changeset
63 '''shellys post status updates periodically, but I want them more often.'''
bc2a93b306e9 port over the powermeter measurements
drewp@bigasterisk.com
parents: 5
diff changeset
64 while True:
bc2a93b306e9 port over the powermeter measurements
drewp@bigasterisk.com
parents: 5
diff changeset
65 for topic in [
bc2a93b306e9 port over the powermeter measurements
drewp@bigasterisk.com
parents: 5
diff changeset
66 "do-r-power/command/switch:0",
bc2a93b306e9 port over the powermeter measurements
drewp@bigasterisk.com
parents: 5
diff changeset
67 "ga-fridge-power/command/switch:0",
bc2a93b306e9 port over the powermeter measurements
drewp@bigasterisk.com
parents: 5
diff changeset
68 "ga-washer-power/command/switch:0",
bc2a93b306e9 port over the powermeter measurements
drewp@bigasterisk.com
parents: 5
diff changeset
69 "tt-fridge-power/command/switch:0",
bc2a93b306e9 port over the powermeter measurements
drewp@bigasterisk.com
parents: 5
diff changeset
70 "ws-bench-power/command/switch:0",
bc2a93b306e9 port over the powermeter measurements
drewp@bigasterisk.com
parents: 5
diff changeset
71 "ws-desk-power/command/switch:0",
bc2a93b306e9 port over the powermeter measurements
drewp@bigasterisk.com
parents: 5
diff changeset
72 "ws-solder-power/command/switch:0",
bc2a93b306e9 port over the powermeter measurements
drewp@bigasterisk.com
parents: 5
diff changeset
73 ]:
bc2a93b306e9 port over the powermeter measurements
drewp@bigasterisk.com
parents: 5
diff changeset
74 await client.publish(topic, 'status_update')
bc2a93b306e9 port over the powermeter measurements
drewp@bigasterisk.com
parents: 5
diff changeset
75 await asyncio.sleep(period)
bc2a93b306e9 port over the powermeter measurements
drewp@bigasterisk.com
parents: 5
diff changeset
76
bc2a93b306e9 port over the powermeter measurements
drewp@bigasterisk.com
parents: 5
diff changeset
77
4
cd1b8d7bda78 get metrics writing to victoriametrics
drewp@bigasterisk.com
parents: 3
diff changeset
78 async def mqttTask(metrics: MetricsWriter):
3
41e36f98f3b8 die on mqtt connection errors
drewp@bigasterisk.com
parents: 2
diff changeset
79 try:
41e36f98f3b8 die on mqtt connection errors
drewp@bigasterisk.com
parents: 2
diff changeset
80 client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter")
41e36f98f3b8 die on mqtt connection errors
drewp@bigasterisk.com
parents: 2
diff changeset
81 async with client:
6
bc2a93b306e9 port over the powermeter measurements
drewp@bigasterisk.com
parents: 5
diff changeset
82 asyncio.create_task(requestStatuses(client))
3
41e36f98f3b8 die on mqtt connection errors
drewp@bigasterisk.com
parents: 2
diff changeset
83 await client.subscribe('#')
41e36f98f3b8 die on mqtt connection errors
drewp@bigasterisk.com
parents: 2
diff changeset
84 async for mqttMessage in client.messages:
41e36f98f3b8 die on mqtt connection errors
drewp@bigasterisk.com
parents: 2
diff changeset
85 try:
4
cd1b8d7bda78 get metrics writing to victoriametrics
drewp@bigasterisk.com
parents: 3
diff changeset
86 onMqttMessage(metrics, mqttMessage)
3
41e36f98f3b8 die on mqtt connection errors
drewp@bigasterisk.com
parents: 2
diff changeset
87 except Exception as e:
4
cd1b8d7bda78 get metrics writing to victoriametrics
drewp@bigasterisk.com
parents: 3
diff changeset
88 log.warning("mqtt message (topic %r) failed: %r", mqttMessage.topic.value, textwrap.shorten(repr(e), width=200))
3
41e36f98f3b8 die on mqtt connection errors
drewp@bigasterisk.com
parents: 2
diff changeset
89 except Exception:
41e36f98f3b8 die on mqtt connection errors
drewp@bigasterisk.com
parents: 2
diff changeset
90 log.error("mqtt task failed", exc_info=True)
41e36f98f3b8 die on mqtt connection errors
drewp@bigasterisk.com
parents: 2
diff changeset
91 os.abort()
2
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
92
4
cd1b8d7bda78 get metrics writing to victoriametrics
drewp@bigasterisk.com
parents: 3
diff changeset
93
cd1b8d7bda78 get metrics writing to victoriametrics
drewp@bigasterisk.com
parents: 3
diff changeset
94 def onMqttMessage(metrics, mqttMessage):
2
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
95 message = simplifyMqttMessage(mqttMessage)
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
96
5
8390d5d0d512 one mqtt message can convert to multiple measurements
drewp@bigasterisk.com
parents: 4
diff changeset
97 metricEvents = tryConverters(message)
8390d5d0d512 one mqtt message can convert to multiple measurements
drewp@bigasterisk.com
parents: 4
diff changeset
98 for metricEvent in metricEvents:
4
cd1b8d7bda78 get metrics writing to victoriametrics
drewp@bigasterisk.com
parents: 3
diff changeset
99 metrics.write(message['t'], metricEvent)
2
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
100 broadcastToDebugListeners({'message': message, 'metricEvent': metricEvent})
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
101
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
102
5
8390d5d0d512 one mqtt message can convert to multiple measurements
drewp@bigasterisk.com
parents: 4
diff changeset
103 def tryConverters(message) -> list[dict]:
8390d5d0d512 one mqtt message can convert to multiple measurements
drewp@bigasterisk.com
parents: 4
diff changeset
104 metricEvents: list[dict] = []
2
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
105 matchedPats = []
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
106 for converter in converters:
9
789042521535 crash on regexp syntax errors
drewp@bigasterisk.com
parents: 8
diff changeset
107 try:
789042521535 crash on regexp syntax errors
drewp@bigasterisk.com
parents: 8
diff changeset
108 m = re.search(converter.topic_pattern, message['topic'])
789042521535 crash on regexp syntax errors
drewp@bigasterisk.com
parents: 8
diff changeset
109 except Exception:
789042521535 crash on regexp syntax errors
drewp@bigasterisk.com
parents: 8
diff changeset
110 log.error('re.search', exc_info=True)
789042521535 crash on regexp syntax errors
drewp@bigasterisk.com
parents: 8
diff changeset
111 os.abort()
789042521535 crash on regexp syntax errors
drewp@bigasterisk.com
parents: 8
diff changeset
112 if not m:
2
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
113 continue
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
114 if not matchedPats:
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
115 try:
5
8390d5d0d512 one mqtt message can convert to multiple measurements
drewp@bigasterisk.com
parents: 4
diff changeset
116 metricEvents.extend(converter(message, cast(tuple[str], m.groups())))
2
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
117 except Exception:
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
118 log.error("Error converting mqtt message: topic pattern = %r", converter.topic_pattern, exc_info=True)
8
82cec89e6534 fix: the point was to notice when multiple patterns matched
drewp@bigasterisk.com
parents: 7
diff changeset
119 matchedPats.append(converter.topic_pattern)
2
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
120
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
121 if len(matchedPats) > 1:
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
122 log.warning("Multiple patterns matched: %s", ", ".join(matchedPats))
579df3a4e62d rewrite converters as register'able functions
drewp@bigasterisk.com
parents: 1
diff changeset
123
5
8390d5d0d512 one mqtt message can convert to multiple measurements
drewp@bigasterisk.com
parents: 4
diff changeset
124 return metricEvents
0
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
125
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
126
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
127 def main():
4
cd1b8d7bda78 get metrics writing to victoriametrics
drewp@bigasterisk.com
parents: 3
diff changeset
128 metrics = MetricsWriter()
1
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
129 loop = asyncio.new_event_loop()
4
cd1b8d7bda78 get metrics writing to victoriametrics
drewp@bigasterisk.com
parents: 3
diff changeset
130 app = Starlette(on_startup=[lambda: loop.create_task(mqttTask(metrics))], debug=True, routes=[
1
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
131 Route('/api/debugEvents', debugEvents),
0
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
132 ])
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
133 app.add_middleware(PrometheusMiddleware, app_name='mqtt_metrics')
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
134 app.add_route("/metrics", handle_metrics)
1
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
135 config = uvicorn.Config(app=app, host='0.0.0.0', port=8001, loop='asyncio')
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
136 server = uvicorn.Server(config)
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
137 loop.run_until_complete(server.serve())
0
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
138
0b5b4ede1bf5 start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff changeset
139
1
3d7f2dc9beec read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents: 0
diff changeset
140 main()