Mercurial > code > home > repos > mqtt_metrics
changeset 1:3d7f2dc9beec
read 1 mqtt topic; dummy convert; route to debugging view
author | drewp@bigasterisk.com |
---|---|
date | Fri, 09 Aug 2024 16:59:06 -0700 |
parents | 0b5b4ede1bf5 |
children | 579df3a4e62d |
files | deploy.yaml mqtt_metrics.py pdm.lock pyproject.toml src/main.ts |
diffstat | 5 files changed, 167 insertions(+), 44 deletions(-) [+] |
line wrap: on
line diff
--- a/deploy.yaml Fri Aug 09 15:09:22 2024 -0700 +++ b/deploy.yaml Fri Aug 09 16:59:06 2024 -0700 @@ -40,11 +40,8 @@ command: - pdm - run - - uvicorn - - '--port=8001' - - '--host=0.0.0.0' - - '--reload' - - 'mqtt_metrics:app' + - python + - 'mqtt_metrics.py' --- apiVersion: v1 kind: Service
--- a/mqtt_metrics.py Fri Aug 09 15:09:22 2024 -0700 +++ b/mqtt_metrics.py Fri Aug 09 16:59:06 2024 -0700 @@ -1,5 +1,13 @@ +import asyncio +import json import logging +import os +import time +from weakref import WeakSet +import aiomqtt +import uvicorn # v 2.0.0 +from sse_starlette.sse import EventSourceResponse from starlette.applications import Starlette from starlette.requests import Request from starlette.responses import JSONResponse @@ -9,20 +17,65 @@ logging.basicConfig(level=logging.INFO) log = logging.getLogger() +debugListeners = WeakSet() -def hello(request: Request) -> JSONResponse: - return JSONResponse({"demo": "hello"}) + +def broadcastToDebugListners(event): + j = json.dumps(event) + for lis in debugListeners: + lis.put_nowait(j) + + +async def debugEvents(request): + q = asyncio.Queue() + debugListeners.add(q) + + async def gen(): + try: + while True: + yield await q.get() + except asyncio.CancelledError: + debugListeners.discard(q) + + resp = EventSourceResponse(gen()) + return resp + + +async def mqttTask(): + client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter") + async with client: + await client.subscribe('rr-air-quality/sensor/particulate_matter__10_0__m_concentration/state') + async for mqttMessage in client.messages: + message = { + 'topic': mqttMessage.topic.value, + 'payload': mqttMessage.payload, + 't': round(time.time(), 3), + } + if isinstance(message['payload'], bytes): + message['payload'] = message['payload'].decode('utf-8') + metricEvent = { + 'name': 'mmm', + 'labels': [{ + 'labelName': 'x', + 'labelValue': 'y' + }], + 'value': '0', + } + print(message, metricEvent) + print("len(debugListeners)", len(debugListeners)) + broadcastToDebugListners({'message': message, 'metricEvent': metricEvent}) def main(): - app = Starlette(debug=True, routes=[ - Route('/api/hello', hello), + loop = asyncio.new_event_loop() + app = Starlette(on_startup=[lambda: loop.create_task(mqttTask())], debug=True, routes=[ + Route('/api/debugEvents', debugEvents), ]) - app.add_middleware(PrometheusMiddleware, app_name='mqtt_metrics') app.add_route("/metrics", handle_metrics) - - return app + config = uvicorn.Config(app=app, host='0.0.0.0', port=8001, loop='asyncio') + server = uvicorn.Server(config) + loop.run_until_complete(server.serve()) -app = main() +main()
--- a/pdm.lock Fri Aug 09 15:09:22 2024 -0700 +++ b/pdm.lock Fri Aug 09 16:59:06 2024 -0700 @@ -5,7 +5,7 @@ groups = ["default"] strategy = ["cross_platform", "inherit_metadata"] lock_version = "4.4.1" -content_hash = "sha256:a0e63f7479692d7acf7ccdd1794e368f6d5effbc17b0d913c369821c3a526318" +content_hash = "sha256:89405b3666ce0bed38558b1364992f0904aaee8c528d03f15e3bd01e2b556953" [[package]] name = "aiohappyeyeballs" @@ -67,6 +67,20 @@ ] [[package]] +name = "aiomqtt" +version = "2.3.0" +requires_python = "<4.0,>=3.8" +summary = "The idiomatic asyncio MQTT client, wrapped around paho-mqtt" +groups = ["default"] +dependencies = [ + "paho-mqtt<3.0.0,>=2.1.0", +] +files = [ + {file = "aiomqtt-2.3.0-py3-none-any.whl", hash = "sha256:127926717bd6b012d1630f9087f24552eb9c4af58205bc2964f09d6e304f7e63"}, + {file = "aiomqtt-2.3.0.tar.gz", hash = "sha256:312feebe20bc76dc7c20916663011f3bd37aa6f42f9f687a19a1c58308d80d47"}, +] + +[[package]] name = "aiosignal" version = "1.3.1" requires_python = ">=3.7" @@ -416,6 +430,17 @@ ] [[package]] +name = "paho-mqtt" +version = "2.1.0" +requires_python = ">=3.7" +summary = "MQTT version 5.0/3.1.1 client class" +groups = ["default"] +files = [ + {file = "paho_mqtt-2.1.0-py3-none-any.whl", hash = "sha256:6db9ba9b34ed5bc6b6e3812718c7e06e2fd7444540df2455d2c51bd58808feee"}, + {file = "paho_mqtt-2.1.0.tar.gz", hash = "sha256:12d6e7511d4137555a3f6ea167ae846af2c7357b10bc6fa4f7c3968fc1723834"}, +] + +[[package]] name = "patchablegraph" version = "1.5.0" requires_python = ">=3.9"
--- a/pyproject.toml Fri Aug 09 15:09:22 2024 -0700 +++ b/pyproject.toml Fri Aug 09 16:59:06 2024 -0700 @@ -10,10 +10,11 @@ "starlette-exporter>=0.13.0", "starlette>=0.20.4", "uvicorn[standard]>=0.18.2", - "background-loop>=1.3.0", "patchablegraph>=1.5.0", "rdfdb==0.24.0", + "aiomqtt>=2.3.0", + "sse-starlette>=2.1.3", ] requires-python = ">=3.11" license = {text = "MIT"}
--- a/src/main.ts Fri Aug 09 15:09:22 2024 -0700 +++ b/src/main.ts Fri Aug 09 16:59:06 2024 -0700 @@ -1,5 +1,21 @@ -import { LitElement, TemplateResult, css, html } from "lit"; -import { customElement } from "lit/decorators.js"; +import { LitElement, PropertyValues, TemplateResult, css, html } from "lit"; +import { customElement, property, state } from "lit/decorators.js"; + +type ConversionEvent = { + message: { + t: number; + topic: string; + payload: string; + }; + metricEvent: { + name: string; + labels: { + labelName: string; + labelValue: string; + }[]; + value: string; + }; +}; @customElement("mm-page") export class MmPage extends LitElement { @@ -8,19 +24,51 @@ :host { display: flex; flex-direction: column; - height: 100vh; + min-height: 100vh; background: #121212; color: white; text-shadow: 0.6px 0.6px 0px #ffffff70; font-family: monospace; font-size: 135%; } + `, + ]; + @state() displayedMessages: ConversionEvent[] = []; + + protected firstUpdated(_changedProperties: PropertyValues): void { + super.firstUpdated(_changedProperties); + new EventSource("api/debugEvents").addEventListener("message", (ev) => { + this.displayedMessages.push(JSON.parse(ev.data)); + const maxShown = 20; + if (this.displayedMessages.length > maxShown) { + this.displayedMessages = this.displayedMessages.slice(-maxShown); + } + this.requestUpdate(); + }); + } + + render() { + return html` + <h1>Mqtt metrics</h1> + + ${this.displayedMessages.map((ev) => html`<mm-conversion-event .ev=${ev}></mm-conversion-event>`)} + <p><a href="metrics">metrics</a> |</p> + <bigast-loginbar></bigast-loginbar> + `; + } +} + +@customElement("mm-conversion-event") +export class MmConversionEvent extends LitElement { + static styles = [ + css` div.message { color: #888; + margin-bottom: 1em; } .msgKey { color: #006699; - padding-left: 14px; + padding-left: 1em; } .msgValue { color: #ff4500; @@ -39,33 +87,32 @@ } `, ]; + @property() ev?: ConversionEvent; render() { - return html` - <h1>Mqtt metrics</h1> + if (!this.ev) return html``; + const ev = this.ev; + const t = new Date(ev.message.t * 1000); + return html`<div class="message"> + <div> + Incoming mqtt message: + <span class="msgKey">t</span>=<span class="msgValue"> ${t.toLocaleString("sv")} </span> <span class="msgKey">topic</span>=<span class="msgValue" + >${ev.message.topic}</span + > + <span class="msgKey">message</span>=<span class="msgValue">${ev.message.payload}</span> + </div> + <div> + Converted to metric: + <span class="metricName">${ev.metricEvent.name}</span> - <div class="message"> - <div> - Incoming mqtt message: <span class="msgKey">t</span>=<span class="msgValue">${"2024-08-09 14:43:28"}</span> <span class="msgKey">topic</span>=<span - class="msgValue" - >${"tr-air-quality/sensor/particulate_matter__10_0__m_concentration/state"}</span - > - <span class="msgKey">message</span>=<span class="msgValue">${"12"}</span> - </div> - <div> - Converted to metrics event: - <span class="metricName">pm_concentration</span> - { - <span class="metricLabelName">location</span>="<span class="metricLabelValue">rr</span>", <span class="metricLabelName">size</span>="<span - class="metricLabelValue" - >10</span - >" } value=<span class="metricValue">12</span> - </div> + {${ev.metricEvent.labels.map( + (l, i) => html` + <span class="metricLabelName">${l.labelName}</span>=<span class="metricLabelValue">${l.labelValue}</span> + + ${i < ev.metricEvent.labels.length - 1 ? "," : "}"} + ` + )} + value=<span class="metricValue">${ev.metricEvent.value}</span> </div> - - <p> - <a href="metrics">metrics</a> | - </p> - <bigast-loginbar></bigast-loginbar> - `; + </div>`; } }