Mercurial > code > home > repos > mqtt_metrics
diff mqtt_metrics.py @ 1:3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
author | drewp@bigasterisk.com |
---|---|
date | Fri, 09 Aug 2024 16:59:06 -0700 |
parents | 0b5b4ede1bf5 |
children | 579df3a4e62d |
line wrap: on
line diff
--- a/mqtt_metrics.py Fri Aug 09 15:09:22 2024 -0700 +++ b/mqtt_metrics.py Fri Aug 09 16:59:06 2024 -0700 @@ -1,5 +1,13 @@ +import asyncio +import json import logging +import os +import time +from weakref import WeakSet +import aiomqtt +import uvicorn # v 2.0.0 +from sse_starlette.sse import EventSourceResponse from starlette.applications import Starlette from starlette.requests import Request from starlette.responses import JSONResponse @@ -9,20 +17,65 @@ logging.basicConfig(level=logging.INFO) log = logging.getLogger() +debugListeners = WeakSet() -def hello(request: Request) -> JSONResponse: - return JSONResponse({"demo": "hello"}) + +def broadcastToDebugListners(event): + j = json.dumps(event) + for lis in debugListeners: + lis.put_nowait(j) + + +async def debugEvents(request): + q = asyncio.Queue() + debugListeners.add(q) + + async def gen(): + try: + while True: + yield await q.get() + except asyncio.CancelledError: + debugListeners.discard(q) + + resp = EventSourceResponse(gen()) + return resp + + +async def mqttTask(): + client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter") + async with client: + await client.subscribe('rr-air-quality/sensor/particulate_matter__10_0__m_concentration/state') + async for mqttMessage in client.messages: + message = { + 'topic': mqttMessage.topic.value, + 'payload': mqttMessage.payload, + 't': round(time.time(), 3), + } + if isinstance(message['payload'], bytes): + message['payload'] = message['payload'].decode('utf-8') + metricEvent = { + 'name': 'mmm', + 'labels': [{ + 'labelName': 'x', + 'labelValue': 'y' + }], + 'value': '0', + } + print(message, metricEvent) + print("len(debugListeners)", len(debugListeners)) + broadcastToDebugListners({'message': message, 'metricEvent': metricEvent}) def main(): - app = Starlette(debug=True, routes=[ - Route('/api/hello', hello), + loop = asyncio.new_event_loop() + app = Starlette(on_startup=[lambda: loop.create_task(mqttTask())], debug=True, routes=[ + Route('/api/debugEvents', debugEvents), ]) - app.add_middleware(PrometheusMiddleware, app_name='mqtt_metrics') app.add_route("/metrics", handle_metrics) - - return app + config = uvicorn.Config(app=app, host='0.0.0.0', port=8001, loop='asyncio') + server = uvicorn.Server(config) + loop.run_until_complete(server.serve()) -app = main() +main()