diff mqtt_metrics.py @ 1:3d7f2dc9beec

read 1 mqtt topic; dummy convert; route to debugging view
author drewp@bigasterisk.com
date Fri, 09 Aug 2024 16:59:06 -0700
parents 0b5b4ede1bf5
children 579df3a4e62d
line wrap: on
line diff
--- a/mqtt_metrics.py	Fri Aug 09 15:09:22 2024 -0700
+++ b/mqtt_metrics.py	Fri Aug 09 16:59:06 2024 -0700
@@ -1,5 +1,13 @@
+import asyncio
+import json
 import logging
+import os
+import time
+from weakref import WeakSet
 
+import aiomqtt
+import uvicorn  # v 2.0.0
+from sse_starlette.sse import EventSourceResponse
 from starlette.applications import Starlette
 from starlette.requests import Request
 from starlette.responses import JSONResponse
@@ -9,20 +17,65 @@
 logging.basicConfig(level=logging.INFO)
 log = logging.getLogger()
 
+debugListeners = WeakSet()
 
-def hello(request: Request) -> JSONResponse:
-    return JSONResponse({"demo": "hello"})
+
+def broadcastToDebugListners(event):
+    j = json.dumps(event)
+    for lis in debugListeners:
+        lis.put_nowait(j)
+
+
+async def debugEvents(request):
+    q = asyncio.Queue()
+    debugListeners.add(q)
+
+    async def gen():
+        try:
+            while True:
+                yield await q.get()
+        except asyncio.CancelledError:
+            debugListeners.discard(q)
+
+    resp = EventSourceResponse(gen())
+    return resp
+
+
+async def mqttTask():
+    client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter")
+    async with client:
+        await client.subscribe('rr-air-quality/sensor/particulate_matter__10_0__m_concentration/state')
+        async for mqttMessage in client.messages:
+            message = {
+                'topic': mqttMessage.topic.value,
+                'payload': mqttMessage.payload,
+                't': round(time.time(), 3),
+            }
+            if isinstance(message['payload'], bytes):
+                message['payload'] = message['payload'].decode('utf-8')
+            metricEvent = {
+                'name': 'mmm',
+                'labels': [{
+                    'labelName': 'x',
+                    'labelValue': 'y'
+                }],
+                'value': '0',
+            }
+            print(message, metricEvent)
+            print("len(debugListeners)", len(debugListeners))
+            broadcastToDebugListners({'message': message, 'metricEvent': metricEvent})
 
 
 def main():
-    app = Starlette(debug=True, routes=[
-        Route('/api/hello', hello),
+    loop = asyncio.new_event_loop()
+    app = Starlette(on_startup=[lambda: loop.create_task(mqttTask())], debug=True, routes=[
+        Route('/api/debugEvents', debugEvents),
     ])
-
     app.add_middleware(PrometheusMiddleware, app_name='mqtt_metrics')
     app.add_route("/metrics", handle_metrics)
-
-    return app
+    config = uvicorn.Config(app=app, host='0.0.0.0', port=8001, loop='asyncio')
+    server = uvicorn.Server(config)
+    loop.run_until_complete(server.serve())
 
 
-app = main()
+main()