changeset 4:cd1b8d7bda78

get metrics writing to victoriametrics
author drewp@bigasterisk.com
date Sat, 10 Aug 2024 21:16:19 -0700
parents 41e36f98f3b8
children 8390d5d0d512
files convert.py mqtt_metrics.py pdm.lock pyproject.toml victoriametrics_write.py
diffstat 5 files changed, 87 insertions(+), 8 deletions(-) [+]
line wrap: on
line diff
--- a/convert.py	Fri Aug 09 18:36:39 2024 -0700
+++ b/convert.py	Sat Aug 10 21:16:19 2024 -0700
@@ -1,3 +1,6 @@
+'''
+Note that these functions need to parse the message['payload'] into a float
+'''
 converters = []
 
 
@@ -22,7 +25,7 @@
             'labelName': 'size',
             'labelValue': '10',
         }],
-        'value': message['payload'],
+        'value': float(message['payload']),
     }
 
 
--- a/mqtt_metrics.py	Fri Aug 09 18:36:39 2024 -0700
+++ b/mqtt_metrics.py	Sat Aug 10 21:16:19 2024 -0700
@@ -1,3 +1,4 @@
+import textwrap
 import asyncio
 import json
 import logging
@@ -16,9 +17,11 @@
 from starlette_exporter import PrometheusMiddleware, handle_metrics
 
 from convert import converters
+from victoriametrics_write import MetricsWriter
 
 logging.basicConfig(level=logging.INFO)
 log = logging.getLogger()
+logging.getLogger('httpx').setLevel(logging.WARNING)
 
 debugListeners = WeakSet()
 
@@ -56,26 +59,27 @@
     return message
 
 
-async def mqttTask():
+async def mqttTask(metrics: MetricsWriter):
     try:
         client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter")
         async with client:
             await client.subscribe('#')
             async for mqttMessage in client.messages:
                 try:
-                    onMqttMessage(mqttMessage)
+                    onMqttMessage(metrics, mqttMessage)
                 except Exception as e:
-                    log.warning("mqtt message (topic %r) failed: %r", mqttMessage.topic.value, e)
-
+                    log.warning("mqtt message (topic %r) failed: %r", mqttMessage.topic.value, textwrap.shorten(repr(e), width=200))
     except Exception:
         log.error("mqtt task failed", exc_info=True)
         os.abort()
 
-def onMqttMessage(mqttMessage):
+
+def onMqttMessage(metrics, mqttMessage):
     message = simplifyMqttMessage(mqttMessage)
 
     metricEvent = tryConverters(message)
     if metricEvent:
+        metrics.write(message['t'], metricEvent)
         broadcastToDebugListeners({'message': message, 'metricEvent': metricEvent})
 
 
@@ -99,8 +103,9 @@
 
 
 def main():
+    metrics = MetricsWriter()
     loop = asyncio.new_event_loop()
-    app = Starlette(on_startup=[lambda: loop.create_task(mqttTask())], debug=True, routes=[
+    app = Starlette(on_startup=[lambda: loop.create_task(mqttTask(metrics))], debug=True, routes=[
         Route('/api/debugEvents', debugEvents),
     ])
     app.add_middleware(PrometheusMiddleware, app_name='mqtt_metrics')
--- a/pdm.lock	Fri Aug 09 18:36:39 2024 -0700
+++ b/pdm.lock	Sat Aug 10 21:16:19 2024 -0700
@@ -5,7 +5,7 @@
 groups = ["default"]
 strategy = ["cross_platform", "inherit_metadata"]
 lock_version = "4.4.1"
-content_hash = "sha256:89405b3666ce0bed38558b1364992f0904aaee8c528d03f15e3bd01e2b556953"
+content_hash = "sha256:5591ac6c868192569633481cf9523511d8d2bc6641f0e88d7e06922b1e896ede"
 
 [[package]]
 name = "aiohappyeyeballs"
@@ -162,6 +162,17 @@
 ]
 
 [[package]]
+name = "certifi"
+version = "2024.7.4"
+requires_python = ">=3.6"
+summary = "Python package for providing Mozilla's CA Bundle."
+groups = ["default"]
+files = [
+    {file = "certifi-2024.7.4-py3-none-any.whl", hash = "sha256:c198e21b1289c2ab85ee4e67bb4b4ef3ead0892059901a8d5b622f24a1101e90"},
+    {file = "certifi-2024.7.4.tar.gz", hash = "sha256:5a1e7645bc0ec61a09e26c36f6106dd4cf40c6db3a1fb6352b0244e7fb057c7b"},
+]
+
+[[package]]
 name = "cffi"
 version = "1.17.0"
 requires_python = ">=3.8"
@@ -327,6 +338,21 @@
 ]
 
 [[package]]
+name = "httpcore"
+version = "1.0.5"
+requires_python = ">=3.8"
+summary = "A minimal low-level HTTP client."
+groups = ["default"]
+dependencies = [
+    "certifi",
+    "h11<0.15,>=0.13",
+]
+files = [
+    {file = "httpcore-1.0.5-py3-none-any.whl", hash = "sha256:421f18bac248b25d310f3cacd198d55b8e6125c107797b609ff9b7a6ba7991b5"},
+    {file = "httpcore-1.0.5.tar.gz", hash = "sha256:34a38e2f9291467ee3b44e89dd52615370e152954ba21721378a87b2960f7a61"},
+]
+
+[[package]]
 name = "httptools"
 version = "0.6.1"
 requires_python = ">=3.8.0"
@@ -351,6 +377,24 @@
 ]
 
 [[package]]
+name = "httpx"
+version = "0.27.0"
+requires_python = ">=3.8"
+summary = "The next generation HTTP client."
+groups = ["default"]
+dependencies = [
+    "anyio",
+    "certifi",
+    "httpcore==1.*",
+    "idna",
+    "sniffio",
+]
+files = [
+    {file = "httpx-0.27.0-py3-none-any.whl", hash = "sha256:71d5465162c13681bff01ad59b2cc68dd838ea1f10e51574bac27103f00c91a5"},
+    {file = "httpx-0.27.0.tar.gz", hash = "sha256:a0cb88a46f32dc874e04ee956e4c2764aba2aa228f650b06788ba6bda2962ab5"},
+]
+
+[[package]]
 name = "hyperlink"
 version = "21.0.0"
 requires_python = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
--- a/pyproject.toml	Fri Aug 09 18:36:39 2024 -0700
+++ b/pyproject.toml	Sat Aug 10 21:16:19 2024 -0700
@@ -15,6 +15,7 @@
     "rdfdb==0.24.0",
     "aiomqtt>=2.3.0",
     "sse-starlette>=2.1.3",
+    "httpx>=0.27.0",
 ]
 requires-python = ">=3.11"
 license = {text = "MIT"}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/victoriametrics_write.py	Sat Aug 10 21:16:19 2024 -0700
@@ -0,0 +1,26 @@
+import httpx
+import logging
+log = logging.getLogger()
+
+class MetricsWriter:
+    # todo: this could merge quick writes, keep the connection open, etc.
+    agentImportUrl = "http://victoriametrics-forever-vmagent/m/forever/vmagent/api/v1/import"
+
+    def write(self, t, metricEvent):
+        # To see inserted data, try this:
+        #   curl http://`khost victoriametrics-vmselect`/m/vmselect/select/0/prometheus/api/v1/export -d 'match[]=vm_http_request_errors_total'
+        labelDict = dict((x['labelName'], x['labelValue']) for x in metricEvent['labels'])
+        promBody = {
+            "metric": {
+                "__name__": metricEvent['name']
+            } | labelDict,
+            "values": [metricEvent['value']],
+            "timestamps": [int(t * 1000)],
+        }
+        log.info(promBody)
+        req = httpx.post(
+            self.agentImportUrl,
+            json=promBody,
+            headers={'content-type': 'application/stream+json'},
+        )
+        req.raise_for_status()