view mqtt_metrics.py @ 1:3d7f2dc9beec

read 1 mqtt topic; dummy convert; route to debugging view
author drewp@bigasterisk.com
date Fri, 09 Aug 2024 16:59:06 -0700
parents 0b5b4ede1bf5
children 579df3a4e62d
line wrap: on
line source

import asyncio
import json
import logging
import os
import time
from weakref import WeakSet

import aiomqtt
import uvicorn  # v 2.0.0
from sse_starlette.sse import EventSourceResponse
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
from starlette_exporter import PrometheusMiddleware, handle_metrics

logging.basicConfig(level=logging.INFO)
log = logging.getLogger()

debugListeners = WeakSet()


def broadcastToDebugListners(event):
    j = json.dumps(event)
    for lis in debugListeners:
        lis.put_nowait(j)


async def debugEvents(request):
    q = asyncio.Queue()
    debugListeners.add(q)

    async def gen():
        try:
            while True:
                yield await q.get()
        except asyncio.CancelledError:
            debugListeners.discard(q)

    resp = EventSourceResponse(gen())
    return resp


async def mqttTask():
    client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter")
    async with client:
        await client.subscribe('rr-air-quality/sensor/particulate_matter__10_0__m_concentration/state')
        async for mqttMessage in client.messages:
            message = {
                'topic': mqttMessage.topic.value,
                'payload': mqttMessage.payload,
                't': round(time.time(), 3),
            }
            if isinstance(message['payload'], bytes):
                message['payload'] = message['payload'].decode('utf-8')
            metricEvent = {
                'name': 'mmm',
                'labels': [{
                    'labelName': 'x',
                    'labelValue': 'y'
                }],
                'value': '0',
            }
            print(message, metricEvent)
            print("len(debugListeners)", len(debugListeners))
            broadcastToDebugListners({'message': message, 'metricEvent': metricEvent})


def main():
    loop = asyncio.new_event_loop()
    app = Starlette(on_startup=[lambda: loop.create_task(mqttTask())], debug=True, routes=[
        Route('/api/debugEvents', debugEvents),
    ])
    app.add_middleware(PrometheusMiddleware, app_name='mqtt_metrics')
    app.add_route("/metrics", handle_metrics)
    config = uvicorn.Config(app=app, host='0.0.0.0', port=8001, loop='asyncio')
    server = uvicorn.Server(config)
    loop.run_until_complete(server.serve())


main()