# HG changeset patch # User drewp@bigasterisk.com # Date 1723247946 25200 # Node ID 3d7f2dc9beece8c521d7ecfc3dc2e32de4a38c2e # Parent 0b5b4ede1bf528d17a48377045ea1546fca78095 read 1 mqtt topic; dummy convert; route to debugging view diff -r 0b5b4ede1bf5 -r 3d7f2dc9beec deploy.yaml --- a/deploy.yaml Fri Aug 09 15:09:22 2024 -0700 +++ b/deploy.yaml Fri Aug 09 16:59:06 2024 -0700 @@ -40,11 +40,8 @@ command: - pdm - run - - uvicorn - - '--port=8001' - - '--host=0.0.0.0' - - '--reload' - - 'mqtt_metrics:app' + - python + - 'mqtt_metrics.py' --- apiVersion: v1 kind: Service diff -r 0b5b4ede1bf5 -r 3d7f2dc9beec mqtt_metrics.py --- a/mqtt_metrics.py Fri Aug 09 15:09:22 2024 -0700 +++ b/mqtt_metrics.py Fri Aug 09 16:59:06 2024 -0700 @@ -1,5 +1,13 @@ +import asyncio +import json import logging +import os +import time +from weakref import WeakSet +import aiomqtt +import uvicorn # v 2.0.0 +from sse_starlette.sse import EventSourceResponse from starlette.applications import Starlette from starlette.requests import Request from starlette.responses import JSONResponse @@ -9,20 +17,65 @@ logging.basicConfig(level=logging.INFO) log = logging.getLogger() +debugListeners = WeakSet() -def hello(request: Request) -> JSONResponse: - return JSONResponse({"demo": "hello"}) + +def broadcastToDebugListners(event): + j = json.dumps(event) + for lis in debugListeners: + lis.put_nowait(j) + + +async def debugEvents(request): + q = asyncio.Queue() + debugListeners.add(q) + + async def gen(): + try: + while True: + yield await q.get() + except asyncio.CancelledError: + debugListeners.discard(q) + + resp = EventSourceResponse(gen()) + return resp + + +async def mqttTask(): + client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter") + async with client: + await client.subscribe('rr-air-quality/sensor/particulate_matter__10_0__m_concentration/state') + async for mqttMessage in client.messages: + message = { + 'topic': mqttMessage.topic.value, + 'payload': mqttMessage.payload, + 't': round(time.time(), 3), + } + if isinstance(message['payload'], bytes): + message['payload'] = message['payload'].decode('utf-8') + metricEvent = { + 'name': 'mmm', + 'labels': [{ + 'labelName': 'x', + 'labelValue': 'y' + }], + 'value': '0', + } + print(message, metricEvent) + print("len(debugListeners)", len(debugListeners)) + broadcastToDebugListners({'message': message, 'metricEvent': metricEvent}) def main(): - app = Starlette(debug=True, routes=[ - Route('/api/hello', hello), + loop = asyncio.new_event_loop() + app = Starlette(on_startup=[lambda: loop.create_task(mqttTask())], debug=True, routes=[ + Route('/api/debugEvents', debugEvents), ]) - app.add_middleware(PrometheusMiddleware, app_name='mqtt_metrics') app.add_route("/metrics", handle_metrics) - - return app + config = uvicorn.Config(app=app, host='0.0.0.0', port=8001, loop='asyncio') + server = uvicorn.Server(config) + loop.run_until_complete(server.serve()) -app = main() +main() diff -r 0b5b4ede1bf5 -r 3d7f2dc9beec pdm.lock --- a/pdm.lock Fri Aug 09 15:09:22 2024 -0700 +++ b/pdm.lock Fri Aug 09 16:59:06 2024 -0700 @@ -5,7 +5,7 @@ groups = ["default"] strategy = ["cross_platform", "inherit_metadata"] lock_version = "4.4.1" -content_hash = "sha256:a0e63f7479692d7acf7ccdd1794e368f6d5effbc17b0d913c369821c3a526318" +content_hash = "sha256:89405b3666ce0bed38558b1364992f0904aaee8c528d03f15e3bd01e2b556953" [[package]] name = "aiohappyeyeballs" @@ -67,6 +67,20 @@ ] [[package]] +name = "aiomqtt" +version = "2.3.0" +requires_python = "<4.0,>=3.8" +summary = "The idiomatic asyncio MQTT client, wrapped around paho-mqtt" +groups = ["default"] +dependencies = [ + "paho-mqtt<3.0.0,>=2.1.0", +] +files = [ + {file = "aiomqtt-2.3.0-py3-none-any.whl", hash = "sha256:127926717bd6b012d1630f9087f24552eb9c4af58205bc2964f09d6e304f7e63"}, + {file = "aiomqtt-2.3.0.tar.gz", hash = "sha256:312feebe20bc76dc7c20916663011f3bd37aa6f42f9f687a19a1c58308d80d47"}, +] + +[[package]] name = "aiosignal" version = "1.3.1" requires_python = ">=3.7" @@ -416,6 +430,17 @@ ] [[package]] +name = "paho-mqtt" +version = "2.1.0" +requires_python = ">=3.7" +summary = "MQTT version 5.0/3.1.1 client class" +groups = ["default"] +files = [ + {file = "paho_mqtt-2.1.0-py3-none-any.whl", hash = "sha256:6db9ba9b34ed5bc6b6e3812718c7e06e2fd7444540df2455d2c51bd58808feee"}, + {file = "paho_mqtt-2.1.0.tar.gz", hash = "sha256:12d6e7511d4137555a3f6ea167ae846af2c7357b10bc6fa4f7c3968fc1723834"}, +] + +[[package]] name = "patchablegraph" version = "1.5.0" requires_python = ">=3.9" diff -r 0b5b4ede1bf5 -r 3d7f2dc9beec pyproject.toml --- a/pyproject.toml Fri Aug 09 15:09:22 2024 -0700 +++ b/pyproject.toml Fri Aug 09 16:59:06 2024 -0700 @@ -10,10 +10,11 @@ "starlette-exporter>=0.13.0", "starlette>=0.20.4", "uvicorn[standard]>=0.18.2", - "background-loop>=1.3.0", "patchablegraph>=1.5.0", "rdfdb==0.24.0", + "aiomqtt>=2.3.0", + "sse-starlette>=2.1.3", ] requires-python = ">=3.11" license = {text = "MIT"} diff -r 0b5b4ede1bf5 -r 3d7f2dc9beec src/main.ts --- a/src/main.ts Fri Aug 09 15:09:22 2024 -0700 +++ b/src/main.ts Fri Aug 09 16:59:06 2024 -0700 @@ -1,5 +1,21 @@ -import { LitElement, TemplateResult, css, html } from "lit"; -import { customElement } from "lit/decorators.js"; +import { LitElement, PropertyValues, TemplateResult, css, html } from "lit"; +import { customElement, property, state } from "lit/decorators.js"; + +type ConversionEvent = { + message: { + t: number; + topic: string; + payload: string; + }; + metricEvent: { + name: string; + labels: { + labelName: string; + labelValue: string; + }[]; + value: string; + }; +}; @customElement("mm-page") export class MmPage extends LitElement { @@ -8,19 +24,51 @@ :host { display: flex; flex-direction: column; - height: 100vh; + min-height: 100vh; background: #121212; color: white; text-shadow: 0.6px 0.6px 0px #ffffff70; font-family: monospace; font-size: 135%; } + `, + ]; + @state() displayedMessages: ConversionEvent[] = []; + + protected firstUpdated(_changedProperties: PropertyValues): void { + super.firstUpdated(_changedProperties); + new EventSource("api/debugEvents").addEventListener("message", (ev) => { + this.displayedMessages.push(JSON.parse(ev.data)); + const maxShown = 20; + if (this.displayedMessages.length > maxShown) { + this.displayedMessages = this.displayedMessages.slice(-maxShown); + } + this.requestUpdate(); + }); + } + + render() { + return html` +
metrics |
+