Mercurial > code > home > repos > mqtt_metrics
comparison mqtt_metrics.py @ 1:3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
author | drewp@bigasterisk.com |
---|---|
date | Fri, 09 Aug 2024 16:59:06 -0700 |
parents | 0b5b4ede1bf5 |
children | 579df3a4e62d |
comparison
equal
deleted
inserted
replaced
0:0b5b4ede1bf5 | 1:3d7f2dc9beec |
---|---|
1 import asyncio | |
2 import json | |
1 import logging | 3 import logging |
4 import os | |
5 import time | |
6 from weakref import WeakSet | |
2 | 7 |
8 import aiomqtt | |
9 import uvicorn # v 2.0.0 | |
10 from sse_starlette.sse import EventSourceResponse | |
3 from starlette.applications import Starlette | 11 from starlette.applications import Starlette |
4 from starlette.requests import Request | 12 from starlette.requests import Request |
5 from starlette.responses import JSONResponse | 13 from starlette.responses import JSONResponse |
6 from starlette.routing import Route | 14 from starlette.routing import Route |
7 from starlette_exporter import PrometheusMiddleware, handle_metrics | 15 from starlette_exporter import PrometheusMiddleware, handle_metrics |
8 | 16 |
9 logging.basicConfig(level=logging.INFO) | 17 logging.basicConfig(level=logging.INFO) |
10 log = logging.getLogger() | 18 log = logging.getLogger() |
11 | 19 |
20 debugListeners = WeakSet() | |
12 | 21 |
13 def hello(request: Request) -> JSONResponse: | 22 |
14 return JSONResponse({"demo": "hello"}) | 23 def broadcastToDebugListners(event): |
24 j = json.dumps(event) | |
25 for lis in debugListeners: | |
26 lis.put_nowait(j) | |
27 | |
28 | |
29 async def debugEvents(request): | |
30 q = asyncio.Queue() | |
31 debugListeners.add(q) | |
32 | |
33 async def gen(): | |
34 try: | |
35 while True: | |
36 yield await q.get() | |
37 except asyncio.CancelledError: | |
38 debugListeners.discard(q) | |
39 | |
40 resp = EventSourceResponse(gen()) | |
41 return resp | |
42 | |
43 | |
44 async def mqttTask(): | |
45 client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter") | |
46 async with client: | |
47 await client.subscribe('rr-air-quality/sensor/particulate_matter__10_0__m_concentration/state') | |
48 async for mqttMessage in client.messages: | |
49 message = { | |
50 'topic': mqttMessage.topic.value, | |
51 'payload': mqttMessage.payload, | |
52 't': round(time.time(), 3), | |
53 } | |
54 if isinstance(message['payload'], bytes): | |
55 message['payload'] = message['payload'].decode('utf-8') | |
56 metricEvent = { | |
57 'name': 'mmm', | |
58 'labels': [{ | |
59 'labelName': 'x', | |
60 'labelValue': 'y' | |
61 }], | |
62 'value': '0', | |
63 } | |
64 print(message, metricEvent) | |
65 print("len(debugListeners)", len(debugListeners)) | |
66 broadcastToDebugListners({'message': message, 'metricEvent': metricEvent}) | |
15 | 67 |
16 | 68 |
17 def main(): | 69 def main(): |
18 app = Starlette(debug=True, routes=[ | 70 loop = asyncio.new_event_loop() |
19 Route('/api/hello', hello), | 71 app = Starlette(on_startup=[lambda: loop.create_task(mqttTask())], debug=True, routes=[ |
72 Route('/api/debugEvents', debugEvents), | |
20 ]) | 73 ]) |
21 | |
22 app.add_middleware(PrometheusMiddleware, app_name='mqtt_metrics') | 74 app.add_middleware(PrometheusMiddleware, app_name='mqtt_metrics') |
23 app.add_route("/metrics", handle_metrics) | 75 app.add_route("/metrics", handle_metrics) |
24 | 76 config = uvicorn.Config(app=app, host='0.0.0.0', port=8001, loop='asyncio') |
25 return app | 77 server = uvicorn.Server(config) |
78 loop.run_until_complete(server.serve()) | |
26 | 79 |
27 | 80 |
28 app = main() | 81 main() |