Mercurial > code > home > repos > mqtt_metrics
annotate mqtt_metrics.py @ 1:3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
author | drewp@bigasterisk.com |
---|---|
date | Fri, 09 Aug 2024 16:59:06 -0700 |
parents | 0b5b4ede1bf5 |
children | 579df3a4e62d |
rev | line source |
---|---|
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
1 import asyncio |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
2 import json |
0
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
3 import logging |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
4 import os |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
5 import time |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
6 from weakref import WeakSet |
0
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
7 |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
8 import aiomqtt |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
9 import uvicorn # v 2.0.0 |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
10 from sse_starlette.sse import EventSourceResponse |
0
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
11 from starlette.applications import Starlette |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
12 from starlette.requests import Request |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
13 from starlette.responses import JSONResponse |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
14 from starlette.routing import Route |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
15 from starlette_exporter import PrometheusMiddleware, handle_metrics |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
16 |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
17 logging.basicConfig(level=logging.INFO) |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
18 log = logging.getLogger() |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
19 |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
20 debugListeners = WeakSet() |
0
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
21 |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
22 |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
23 def broadcastToDebugListners(event): |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
24 j = json.dumps(event) |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
25 for lis in debugListeners: |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
26 lis.put_nowait(j) |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
27 |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
28 |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
29 async def debugEvents(request): |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
30 q = asyncio.Queue() |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
31 debugListeners.add(q) |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
32 |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
33 async def gen(): |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
34 try: |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
35 while True: |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
36 yield await q.get() |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
37 except asyncio.CancelledError: |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
38 debugListeners.discard(q) |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
39 |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
40 resp = EventSourceResponse(gen()) |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
41 return resp |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
42 |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
43 |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
44 async def mqttTask(): |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
45 client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter") |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
46 async with client: |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
47 await client.subscribe('rr-air-quality/sensor/particulate_matter__10_0__m_concentration/state') |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
48 async for mqttMessage in client.messages: |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
49 message = { |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
50 'topic': mqttMessage.topic.value, |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
51 'payload': mqttMessage.payload, |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
52 't': round(time.time(), 3), |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
53 } |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
54 if isinstance(message['payload'], bytes): |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
55 message['payload'] = message['payload'].decode('utf-8') |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
56 metricEvent = { |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
57 'name': 'mmm', |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
58 'labels': [{ |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
59 'labelName': 'x', |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
60 'labelValue': 'y' |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
61 }], |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
62 'value': '0', |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
63 } |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
64 print(message, metricEvent) |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
65 print("len(debugListeners)", len(debugListeners)) |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
66 broadcastToDebugListners({'message': message, 'metricEvent': metricEvent}) |
0
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
67 |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
68 |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
69 def main(): |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
70 loop = asyncio.new_event_loop() |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
71 app = Starlette(on_startup=[lambda: loop.create_task(mqttTask())], debug=True, routes=[ |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
72 Route('/api/debugEvents', debugEvents), |
0
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
73 ]) |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
74 app.add_middleware(PrometheusMiddleware, app_name='mqtt_metrics') |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
75 app.add_route("/metrics", handle_metrics) |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
76 config = uvicorn.Config(app=app, host='0.0.0.0', port=8001, loop='asyncio') |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
77 server = uvicorn.Server(config) |
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
78 loop.run_until_complete(server.serve()) |
0
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
79 |
0b5b4ede1bf5
start with a mockup of the debugging display
drewp@bigasterisk.com
parents:
diff
changeset
|
80 |
1
3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
drewp@bigasterisk.com
parents:
0
diff
changeset
|
81 main() |