comparison mqtt_metrics.py @ 1:3d7f2dc9beec

read 1 mqtt topic; dummy convert; route to debugging view
author drewp@bigasterisk.com
date Fri, 09 Aug 2024 16:59:06 -0700
parents 0b5b4ede1bf5
children 579df3a4e62d
comparison
equal deleted inserted replaced
0:0b5b4ede1bf5 1:3d7f2dc9beec
1 import asyncio
2 import json
1 import logging 3 import logging
4 import os
5 import time
6 from weakref import WeakSet
2 7
8 import aiomqtt
9 import uvicorn # v 2.0.0
10 from sse_starlette.sse import EventSourceResponse
3 from starlette.applications import Starlette 11 from starlette.applications import Starlette
4 from starlette.requests import Request 12 from starlette.requests import Request
5 from starlette.responses import JSONResponse 13 from starlette.responses import JSONResponse
6 from starlette.routing import Route 14 from starlette.routing import Route
7 from starlette_exporter import PrometheusMiddleware, handle_metrics 15 from starlette_exporter import PrometheusMiddleware, handle_metrics
8 16
9 logging.basicConfig(level=logging.INFO) 17 logging.basicConfig(level=logging.INFO)
10 log = logging.getLogger() 18 log = logging.getLogger()
11 19
20 debugListeners = WeakSet()
12 21
13 def hello(request: Request) -> JSONResponse: 22
14 return JSONResponse({"demo": "hello"}) 23 def broadcastToDebugListners(event):
24 j = json.dumps(event)
25 for lis in debugListeners:
26 lis.put_nowait(j)
27
28
29 async def debugEvents(request):
30 q = asyncio.Queue()
31 debugListeners.add(q)
32
33 async def gen():
34 try:
35 while True:
36 yield await q.get()
37 except asyncio.CancelledError:
38 debugListeners.discard(q)
39
40 resp = EventSourceResponse(gen())
41 return resp
42
43
44 async def mqttTask():
45 client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter")
46 async with client:
47 await client.subscribe('rr-air-quality/sensor/particulate_matter__10_0__m_concentration/state')
48 async for mqttMessage in client.messages:
49 message = {
50 'topic': mqttMessage.topic.value,
51 'payload': mqttMessage.payload,
52 't': round(time.time(), 3),
53 }
54 if isinstance(message['payload'], bytes):
55 message['payload'] = message['payload'].decode('utf-8')
56 metricEvent = {
57 'name': 'mmm',
58 'labels': [{
59 'labelName': 'x',
60 'labelValue': 'y'
61 }],
62 'value': '0',
63 }
64 print(message, metricEvent)
65 print("len(debugListeners)", len(debugListeners))
66 broadcastToDebugListners({'message': message, 'metricEvent': metricEvent})
15 67
16 68
17 def main(): 69 def main():
18 app = Starlette(debug=True, routes=[ 70 loop = asyncio.new_event_loop()
19 Route('/api/hello', hello), 71 app = Starlette(on_startup=[lambda: loop.create_task(mqttTask())], debug=True, routes=[
72 Route('/api/debugEvents', debugEvents),
20 ]) 73 ])
21
22 app.add_middleware(PrometheusMiddleware, app_name='mqtt_metrics') 74 app.add_middleware(PrometheusMiddleware, app_name='mqtt_metrics')
23 app.add_route("/metrics", handle_metrics) 75 app.add_route("/metrics", handle_metrics)
24 76 config = uvicorn.Config(app=app, host='0.0.0.0', port=8001, loop='asyncio')
25 return app 77 server = uvicorn.Server(config)
78 loop.run_until_complete(server.serve())
26 79
27 80
28 app = main() 81 main()