Mercurial > code > home > repos > mqtt_metrics
annotate mqtt_metrics.py @ 9:789042521535
crash on regexp syntax errors
author | drewp@bigasterisk.com |
---|---|
date | Mon, 12 Aug 2024 13:14:39 -0700 |
parents | 82cec89e6534 |
children |
rev | line source |
---|---|
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
1 import asyncio |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
2 import json |
0
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
3 import logging |
3 | 4 import os |
2
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
5 import re |
7 | 6 import textwrap |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
7 import time |
2
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
8 from typing import cast |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
9 from weakref import WeakSet |
0
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
10 |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
11 import aiomqtt |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
12 import uvicorn # v 2.0.0 |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
13 from sse_starlette.sse import EventSourceResponse |
0
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
14 from starlette.applications import Starlette |
7 | 15 from starlette.requests import Request |
2
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
16 from starlette.routing import Route |
0
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
17 from starlette_exporter import PrometheusMiddleware, handle_metrics |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
18 |
2
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
19 from convert import converters |
4 | 20 from victoriametrics_write import MetricsWriter |
2
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
21 |
0
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
22 logging.basicConfig(level=logging.INFO) |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
23 log = logging.getLogger() |
4 | 24 logging.getLogger('httpx').setLevel(logging.WARNING) |
0
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
25 |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
26 debugListeners = WeakSet() |
0
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
27 |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
28 |
2
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
29 def broadcastToDebugListeners(event: dict): |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
30 j = json.dumps(event) |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
31 for lis in debugListeners: |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
32 lis.put_nowait(j) |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
33 |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
34 |
2
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
35 async def debugEvents(request: Request) -> EventSourceResponse: |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
36 q = asyncio.Queue() |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
37 debugListeners.add(q) |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
38 |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
39 async def gen(): |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
40 try: |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
41 while True: |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
42 yield await q.get() |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
43 except asyncio.CancelledError: |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
44 debugListeners.discard(q) |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
45 |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
46 resp = EventSourceResponse(gen()) |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
47 return resp |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
48 |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
49 |
2
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
50 def simplifyMqttMessage(mqttMessage: aiomqtt.Message) -> dict: |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
51 message = { |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
52 't': round(time.time(), 3), |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
53 'topic': mqttMessage.topic.value, |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
54 'payload': mqttMessage.payload, |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
55 } |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
56 if isinstance(message['payload'], bytes): |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
57 message['payload'] = message['payload'].decode('utf-8') |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
58 |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
59 return message |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
60 |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
61 |
6 | 62 async def requestStatuses(client: aiomqtt.Client, period=10): |
63 '''shellys post status updates periodically, but I want them more often.''' | |
64 while True: | |
65 for topic in [ | |
66 "do-r-power/command/switch:0", | |
67 "ga-fridge-power/command/switch:0", | |
68 "ga-washer-power/command/switch:0", | |
69 "tt-fridge-power/command/switch:0", | |
70 "ws-bench-power/command/switch:0", | |
71 "ws-desk-power/command/switch:0", | |
72 "ws-solder-power/command/switch:0", | |
73 ]: | |
74 await client.publish(topic, 'status_update') | |
75 await asyncio.sleep(period) | |
76 | |
77 | |
4 | 78 async def mqttTask(metrics: MetricsWriter): |
3 | 79 try: |
80 client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter") | |
81 async with client: | |
6 | 82 asyncio.create_task(requestStatuses(client)) |
3 | 83 await client.subscribe('#') |
84 async for mqttMessage in client.messages: | |
85 try: | |
4 | 86 onMqttMessage(metrics, mqttMessage) |
3 | 87 except Exception as e: |
4 | 88 log.warning("mqtt message (topic %r) failed: %r", mqttMessage.topic.value, textwrap.shorten(repr(e), width=200)) |
3 | 89 except Exception: |
90 log.error("mqtt task failed", exc_info=True) | |
91 os.abort() | |
2
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
92 |
4 | 93 |
94 def onMqttMessage(metrics, mqttMessage): | |
2
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
95 message = simplifyMqttMessage(mqttMessage) |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
96 |
5
8390d5d0d512
one mqtt message can convert to multiple measurements
drewp@bigasterisk.com
parents:
4
diff
changeset
|
97 metricEvents = tryConverters(message) |
8390d5d0d512
one mqtt message can convert to multiple measurements
drewp@bigasterisk.com
parents:
4
diff
changeset
|
98 for metricEvent in metricEvents: |
4 | 99 metrics.write(message['t'], metricEvent) |
2
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
100 broadcastToDebugListeners({'message': message, 'metricEvent': metricEvent}) |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
101 |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
102 |
5
8390d5d0d512
one mqtt message can convert to multiple measurements
drewp@bigasterisk.com
parents:
4
diff
changeset
|
103 def tryConverters(message) -> list[dict]: |
8390d5d0d512
one mqtt message can convert to multiple measurements
drewp@bigasterisk.com
parents:
4
diff
changeset
|
104 metricEvents: list[dict] = [] |
2
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
105 matchedPats = [] |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
106 for converter in converters: |
9 | 107 try: |
108 m = re.search(converter.topic_pattern, message['topic']) | |
109 except Exception: | |
110 log.error('re.search', exc_info=True) | |
111 os.abort() | |
112 if not m: | |
2
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
113 continue |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
114 if not matchedPats: |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
115 try: |
5
8390d5d0d512
one mqtt message can convert to multiple measurements
drewp@bigasterisk.com
parents:
4
diff
changeset
|
116 metricEvents.extend(converter(message, cast(tuple[str], m.groups()))) |
2
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
117 except Exception: |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
118 log.error("Error converting mqtt message: topic pattern = %r", converter.topic_pattern, exc_info=True) |
8
82cec89e6534
fix: the point was to notice when multiple patterns matched
drewp@bigasterisk.com
parents:
7
diff
changeset
|
119 matchedPats.append(converter.topic_pattern) |
2
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
120 |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
121 if len(matchedPats) > 1: |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
122 log.warning("Multiple patterns matched: %s", ", ".join(matchedPats)) |
579df3a4e62d
rewrite converters as register'able functions
drewp@bigasterisk.com
parents:
1
diff
changeset
|
123 |
5
8390d5d0d512
one mqtt message can convert to multiple measurements
drewp@bigasterisk.com
parents:
4
diff
changeset
|
124 return metricEvents |
0
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
125 |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
126 |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
127 def main(): |
4 | 128 metrics = MetricsWriter() |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
129 loop = asyncio.new_event_loop() |
4 | 130 app = Starlette(on_startup=[lambda: loop.create_task(mqttTask(metrics))], debug=True, routes=[ |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
131 Route('/api/debugEvents', debugEvents), |
0
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
132 ]) |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
133 app.add_middleware(PrometheusMiddleware, app_name='mqtt_metrics') |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
134 app.add_route("/metrics", handle_metrics) |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
135 config = uvicorn.Config(app=app, host='0.0.0.0', port=8001, loop='asyncio') |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
136 server = uvicorn.Server(config) |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
137 loop.run_until_complete(server.serve()) |
0
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
138 |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
139 |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
140 main() |