changeset 1:3d7f2dc9beec

read 1 mqtt topic; dummy convert; route to debugging view
author drewp@bigasterisk.com
date Fri, 09 Aug 2024 16:59:06 -0700
parents 0b5b4ede1bf5
children 579df3a4e62d
files deploy.yaml mqtt_metrics.py pdm.lock pyproject.toml src/main.ts
diffstat 5 files changed, 167 insertions(+), 44 deletions(-) [+]
line wrap: on
line diff
--- a/deploy.yaml	Fri Aug 09 15:09:22 2024 -0700
+++ b/deploy.yaml	Fri Aug 09 16:59:06 2024 -0700
@@ -40,11 +40,8 @@
           command:
           - pdm
           - run
-          - uvicorn
-          - '--port=8001'
-          - '--host=0.0.0.0'
-          - '--reload'
-          - 'mqtt_metrics:app'
+          - python
+          - 'mqtt_metrics.py'
 ---
 apiVersion: v1
 kind: Service
--- a/mqtt_metrics.py	Fri Aug 09 15:09:22 2024 -0700
+++ b/mqtt_metrics.py	Fri Aug 09 16:59:06 2024 -0700
@@ -1,5 +1,13 @@
+import asyncio
+import json
 import logging
+import os
+import time
+from weakref import WeakSet
 
+import aiomqtt
+import uvicorn  # v 2.0.0
+from sse_starlette.sse import EventSourceResponse
 from starlette.applications import Starlette
 from starlette.requests import Request
 from starlette.responses import JSONResponse
@@ -9,20 +17,65 @@
 logging.basicConfig(level=logging.INFO)
 log = logging.getLogger()
 
+debugListeners = WeakSet()
 
-def hello(request: Request) -> JSONResponse:
-    return JSONResponse({"demo": "hello"})
+
+def broadcastToDebugListners(event):
+    j = json.dumps(event)
+    for lis in debugListeners:
+        lis.put_nowait(j)
+
+
+async def debugEvents(request):
+    q = asyncio.Queue()
+    debugListeners.add(q)
+
+    async def gen():
+        try:
+            while True:
+                yield await q.get()
+        except asyncio.CancelledError:
+            debugListeners.discard(q)
+
+    resp = EventSourceResponse(gen())
+    return resp
+
+
+async def mqttTask():
+    client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter")
+    async with client:
+        await client.subscribe('rr-air-quality/sensor/particulate_matter__10_0__m_concentration/state')
+        async for mqttMessage in client.messages:
+            message = {
+                'topic': mqttMessage.topic.value,
+                'payload': mqttMessage.payload,
+                't': round(time.time(), 3),
+            }
+            if isinstance(message['payload'], bytes):
+                message['payload'] = message['payload'].decode('utf-8')
+            metricEvent = {
+                'name': 'mmm',
+                'labels': [{
+                    'labelName': 'x',
+                    'labelValue': 'y'
+                }],
+                'value': '0',
+            }
+            print(message, metricEvent)
+            print("len(debugListeners)", len(debugListeners))
+            broadcastToDebugListners({'message': message, 'metricEvent': metricEvent})
 
 
 def main():
-    app = Starlette(debug=True, routes=[
-        Route('/api/hello', hello),
+    loop = asyncio.new_event_loop()
+    app = Starlette(on_startup=[lambda: loop.create_task(mqttTask())], debug=True, routes=[
+        Route('/api/debugEvents', debugEvents),
     ])
-
     app.add_middleware(PrometheusMiddleware, app_name='mqtt_metrics')
     app.add_route("/metrics", handle_metrics)
-
-    return app
+    config = uvicorn.Config(app=app, host='0.0.0.0', port=8001, loop='asyncio')
+    server = uvicorn.Server(config)
+    loop.run_until_complete(server.serve())
 
 
-app = main()
+main()
--- a/pdm.lock	Fri Aug 09 15:09:22 2024 -0700
+++ b/pdm.lock	Fri Aug 09 16:59:06 2024 -0700
@@ -5,7 +5,7 @@
 groups = ["default"]
 strategy = ["cross_platform", "inherit_metadata"]
 lock_version = "4.4.1"
-content_hash = "sha256:a0e63f7479692d7acf7ccdd1794e368f6d5effbc17b0d913c369821c3a526318"
+content_hash = "sha256:89405b3666ce0bed38558b1364992f0904aaee8c528d03f15e3bd01e2b556953"
 
 [[package]]
 name = "aiohappyeyeballs"
@@ -67,6 +67,20 @@
 ]
 
 [[package]]
+name = "aiomqtt"
+version = "2.3.0"
+requires_python = "<4.0,>=3.8"
+summary = "The idiomatic asyncio MQTT client, wrapped around paho-mqtt"
+groups = ["default"]
+dependencies = [
+    "paho-mqtt<3.0.0,>=2.1.0",
+]
+files = [
+    {file = "aiomqtt-2.3.0-py3-none-any.whl", hash = "sha256:127926717bd6b012d1630f9087f24552eb9c4af58205bc2964f09d6e304f7e63"},
+    {file = "aiomqtt-2.3.0.tar.gz", hash = "sha256:312feebe20bc76dc7c20916663011f3bd37aa6f42f9f687a19a1c58308d80d47"},
+]
+
+[[package]]
 name = "aiosignal"
 version = "1.3.1"
 requires_python = ">=3.7"
@@ -416,6 +430,17 @@
 ]
 
 [[package]]
+name = "paho-mqtt"
+version = "2.1.0"
+requires_python = ">=3.7"
+summary = "MQTT version 5.0/3.1.1 client class"
+groups = ["default"]
+files = [
+    {file = "paho_mqtt-2.1.0-py3-none-any.whl", hash = "sha256:6db9ba9b34ed5bc6b6e3812718c7e06e2fd7444540df2455d2c51bd58808feee"},
+    {file = "paho_mqtt-2.1.0.tar.gz", hash = "sha256:12d6e7511d4137555a3f6ea167ae846af2c7357b10bc6fa4f7c3968fc1723834"},
+]
+
+[[package]]
 name = "patchablegraph"
 version = "1.5.0"
 requires_python = ">=3.9"
--- a/pyproject.toml	Fri Aug 09 15:09:22 2024 -0700
+++ b/pyproject.toml	Fri Aug 09 16:59:06 2024 -0700
@@ -10,10 +10,11 @@
     "starlette-exporter>=0.13.0",
     "starlette>=0.20.4",
     "uvicorn[standard]>=0.18.2",
-
     "background-loop>=1.3.0",
     "patchablegraph>=1.5.0",
     "rdfdb==0.24.0",
+    "aiomqtt>=2.3.0",
+    "sse-starlette>=2.1.3",
 ]
 requires-python = ">=3.11"
 license = {text = "MIT"}
--- a/src/main.ts	Fri Aug 09 15:09:22 2024 -0700
+++ b/src/main.ts	Fri Aug 09 16:59:06 2024 -0700
@@ -1,5 +1,21 @@
-import { LitElement, TemplateResult, css, html } from "lit";
-import { customElement } from "lit/decorators.js";
+import { LitElement, PropertyValues, TemplateResult, css, html } from "lit";
+import { customElement, property, state } from "lit/decorators.js";
+
+type ConversionEvent = {
+  message: {
+    t: number;
+    topic: string;
+    payload: string;
+  };
+  metricEvent: {
+    name: string;
+    labels: {
+      labelName: string;
+      labelValue: string;
+    }[];
+    value: string;
+  };
+};
 
 @customElement("mm-page")
 export class MmPage extends LitElement {
@@ -8,19 +24,51 @@
       :host {
         display: flex;
         flex-direction: column;
-        height: 100vh;
+        min-height: 100vh;
         background: #121212;
         color: white;
         text-shadow: 0.6px 0.6px 0px #ffffff70;
         font-family: monospace;
         font-size: 135%;
       }
+    `,
+  ];
+  @state() displayedMessages: ConversionEvent[] = [];
+
+  protected firstUpdated(_changedProperties: PropertyValues): void {
+    super.firstUpdated(_changedProperties);
+    new EventSource("api/debugEvents").addEventListener("message", (ev) => {
+      this.displayedMessages.push(JSON.parse(ev.data));
+      const maxShown = 20;
+      if (this.displayedMessages.length > maxShown) {
+        this.displayedMessages = this.displayedMessages.slice(-maxShown);
+      }
+      this.requestUpdate();
+    });
+  }
+
+  render() {
+    return html`
+      <h1>Mqtt metrics</h1>
+
+      ${this.displayedMessages.map((ev) => html`<mm-conversion-event .ev=${ev}></mm-conversion-event>`)}
+      <p><a href="metrics">metrics</a> |</p>
+      <bigast-loginbar></bigast-loginbar>
+    `;
+  }
+}
+
+@customElement("mm-conversion-event")
+export class MmConversionEvent extends LitElement {
+  static styles = [
+    css`
       div.message {
         color: #888;
+        margin-bottom: 1em;
       }
       .msgKey {
         color: #006699;
-        padding-left: 14px;
+        padding-left: 1em;
       }
       .msgValue {
         color: #ff4500;
@@ -39,33 +87,32 @@
       }
     `,
   ];
+  @property() ev?: ConversionEvent;
   render() {
-    return html`
-      <h1>Mqtt metrics</h1>
+    if (!this.ev) return html``;
+    const ev = this.ev;
+    const t = new Date(ev.message.t * 1000);
+    return html`<div class="message">
+      <div>
+        Incoming mqtt message:
+        <span class="msgKey">t</span>=<span class="msgValue"> ${t.toLocaleString("sv")} </span> <span class="msgKey">topic</span>=<span class="msgValue"
+          >${ev.message.topic}</span
+        >
+        <span class="msgKey">message</span>=<span class="msgValue">${ev.message.payload}</span>
+      </div>
+      <div>
+        Converted to metric:
+        <span class="metricName">${ev.metricEvent.name}</span>
 
-      <div class="message">
-        <div>
-          Incoming mqtt message: <span class="msgKey">t</span>=<span class="msgValue">${"2024-08-09 14:43:28"}</span> <span class="msgKey">topic</span>=<span
-            class="msgValue"
-            >${"tr-air-quality/sensor/particulate_matter__10_0__m_concentration/state"}</span
-          >
-          <span class="msgKey">message</span>=<span class="msgValue">${"12"}</span>
-        </div>
-        <div>
-          Converted to metrics event:
-          <span class="metricName">pm_concentration</span>
-          {
-          <span class="metricLabelName">location</span>="<span class="metricLabelValue">rr</span>", <span class="metricLabelName">size</span>="<span
-            class="metricLabelValue"
-            >10</span
-          >" } value=<span class="metricValue">12</span>
-        </div>
+        {${ev.metricEvent.labels.map(
+          (l, i) => html`
+            <span class="metricLabelName">${l.labelName}</span>=<span class="metricLabelValue">${l.labelValue}</span>
+
+            ${i < ev.metricEvent.labels.length - 1 ? "," : "}"}
+          `
+        )}
+        value=<span class="metricValue">${ev.metricEvent.value}</span>
       </div>
-
-      <p>
-        <a href="metrics">metrics</a> |
-      </p>
-      <bigast-loginbar></bigast-loginbar>
-    `;
+    </div>`;
   }
 }