Mercurial > code > home > repos > mqtt_metrics
view mqtt_metrics.py @ 1:3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
author | drewp@bigasterisk.com |
---|---|
date | Fri, 09 Aug 2024 16:59:06 -0700 |
parents | 0b5b4ede1bf5 |
children | 579df3a4e62d |
line wrap: on
line source
import asyncio import json import logging import os import time from weakref import WeakSet import aiomqtt import uvicorn # v 2.0.0 from sse_starlette.sse import EventSourceResponse from starlette.applications import Starlette from starlette.requests import Request from starlette.responses import JSONResponse from starlette.routing import Route from starlette_exporter import PrometheusMiddleware, handle_metrics logging.basicConfig(level=logging.INFO) log = logging.getLogger() debugListeners = WeakSet() def broadcastToDebugListners(event): j = json.dumps(event) for lis in debugListeners: lis.put_nowait(j) async def debugEvents(request): q = asyncio.Queue() debugListeners.add(q) async def gen(): try: while True: yield await q.get() except asyncio.CancelledError: debugListeners.discard(q) resp = EventSourceResponse(gen()) return resp async def mqttTask(): client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter") async with client: await client.subscribe('rr-air-quality/sensor/particulate_matter__10_0__m_concentration/state') async for mqttMessage in client.messages: message = { 'topic': mqttMessage.topic.value, 'payload': mqttMessage.payload, 't': round(time.time(), 3), } if isinstance(message['payload'], bytes): message['payload'] = message['payload'].decode('utf-8') metricEvent = { 'name': 'mmm', 'labels': [{ 'labelName': 'x', 'labelValue': 'y' }], 'value': '0', } print(message, metricEvent) print("len(debugListeners)", len(debugListeners)) broadcastToDebugListners({'message': message, 'metricEvent': metricEvent}) def main(): loop = asyncio.new_event_loop() app = Starlette(on_startup=[lambda: loop.create_task(mqttTask())], debug=True, routes=[ Route('/api/debugEvents', debugEvents), ]) app.add_middleware(PrometheusMiddleware, app_name='mqtt_metrics') app.add_route("/metrics", handle_metrics) config = uvicorn.Config(app=app, host='0.0.0.0', port=8001, loop='asyncio') server = uvicorn.Server(config) loop.run_until_complete(server.serve()) main()