changeset 2:579df3a4e62d

rewrite converters as register'able functions
author drewp@bigasterisk.com
date Fri, 09 Aug 2024 17:37:00 -0700
parents 3d7f2dc9beec
children 41e36f98f3b8
files convert.py mqtt_metrics.py src/main.ts
diffstat 3 files changed, 87 insertions(+), 25 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/convert.py	Fri Aug 09 17:37:00 2024 -0700
@@ -0,0 +1,38 @@
+converters = []
+
+
+def topic(topic_pattern: str):
+
+    def decorator(func):
+        func.topic_pattern = topic_pattern
+        converters.append(func)
+        return func
+
+    return decorator
+
+
+@topic(r'([^-]+)-air-quality/sensor/particulate_matter__10_0__m_concentration/state')
+def pm(message, topicGroups: tuple[str]):
+    return {
+        'name': 'air_quality_pm',
+        'labels': [{
+            'labelName': 'location',
+            'labelValue': topicGroups[0],
+        }, {
+            'labelName': 'size',
+            'labelValue': '10',
+        }],
+        'value': message['payload'],
+    }
+
+
+@topic(r'([^-]+)-air-quality/sensor/air_temperature_c/state')
+def air_temp(message, topicGroups: tuple[str]):
+    return {
+        'name': 'air_temperature_f',
+        'labels': [{
+            'labelName': 'location',
+            'labelValue': topicGroups[0],
+        }],
+        'value': (float(message['payload']) * 9 / 5) + 32,
+    }
--- a/mqtt_metrics.py	Fri Aug 09 16:59:06 2024 -0700
+++ b/mqtt_metrics.py	Fri Aug 09 17:37:00 2024 -0700
@@ -1,32 +1,34 @@
 import asyncio
 import json
 import logging
-import os
+import re
 import time
+from typing import cast
 from weakref import WeakSet
 
 import aiomqtt
 import uvicorn  # v 2.0.0
 from sse_starlette.sse import EventSourceResponse
 from starlette.applications import Starlette
+from starlette.routing import Route
 from starlette.requests import Request
-from starlette.responses import JSONResponse
-from starlette.routing import Route
 from starlette_exporter import PrometheusMiddleware, handle_metrics
 
+from convert import converters
+
 logging.basicConfig(level=logging.INFO)
 log = logging.getLogger()
 
 debugListeners = WeakSet()
 
 
-def broadcastToDebugListners(event):
+def broadcastToDebugListeners(event: dict):
     j = json.dumps(event)
     for lis in debugListeners:
         lis.put_nowait(j)
 
 
-async def debugEvents(request):
+async def debugEvents(request: Request) -> EventSourceResponse:
     q = asyncio.Queue()
     debugListeners.add(q)
 
@@ -41,29 +43,51 @@
     return resp
 
 
+def simplifyMqttMessage(mqttMessage: aiomqtt.Message) -> dict:
+    message = {
+        't': round(time.time(), 3),
+        'topic': mqttMessage.topic.value,
+        'payload': mqttMessage.payload,
+    }
+    if isinstance(message['payload'], bytes):
+        message['payload'] = message['payload'].decode('utf-8')
+
+    return message
+
+
 async def mqttTask():
     client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter")
     async with client:
-        await client.subscribe('rr-air-quality/sensor/particulate_matter__10_0__m_concentration/state')
+        await client.subscribe('#')
         async for mqttMessage in client.messages:
-            message = {
-                'topic': mqttMessage.topic.value,
-                'payload': mqttMessage.payload,
-                't': round(time.time(), 3),
-            }
-            if isinstance(message['payload'], bytes):
-                message['payload'] = message['payload'].decode('utf-8')
-            metricEvent = {
-                'name': 'mmm',
-                'labels': [{
-                    'labelName': 'x',
-                    'labelValue': 'y'
-                }],
-                'value': '0',
-            }
-            print(message, metricEvent)
-            print("len(debugListeners)", len(debugListeners))
-            broadcastToDebugListners({'message': message, 'metricEvent': metricEvent})
+            onMqttMessage(mqttMessage)
+
+
+def onMqttMessage(mqttMessage):
+    message = simplifyMqttMessage(mqttMessage)
+
+    metricEvent = tryConverters(message)
+    if metricEvent:
+        broadcastToDebugListeners({'message': message, 'metricEvent': metricEvent})
+
+
+def tryConverters(message) -> dict | None:
+    metricEvent = None
+    matchedPats = []
+    for converter in converters:
+        if not (m := re.search(converter.topic_pattern, message['topic'])):
+            continue
+        if not matchedPats:
+            try:
+                metricEvent = converter(message, cast(tuple[str], m.groups()))
+            except Exception:
+                log.error("Error converting mqtt message: topic pattern = %r", converter.topic_pattern, exc_info=True)
+            matchedPats.append(converter.topic_pattern)
+
+    if len(matchedPats) > 1:
+        log.warning("Multiple patterns matched: %s", ", ".join(matchedPats))
+
+    return metricEvent
 
 
 def main():
--- a/src/main.ts	Fri Aug 09 16:59:06 2024 -0700
+++ b/src/main.ts	Fri Aug 09 17:37:00 2024 -0700
@@ -98,7 +98,7 @@
         <span class="msgKey">t</span>=<span class="msgValue"> ${t.toLocaleString("sv")} </span> <span class="msgKey">topic</span>=<span class="msgValue"
           >${ev.message.topic}</span
         >
-        <span class="msgKey">message</span>=<span class="msgValue">${ev.message.payload}</span>
+        <span class="msgKey">payload</span>=<span class="msgValue">${ev.message.payload}</span>
       </div>
       <div>
         Converted to metric: