# HG changeset patch # User drewp@bigasterisk.com # Date 1723349779 25200 # Node ID cd1b8d7bda7877f1f284b0d472d2f87272669f47 # Parent 41e36f98f3b8b8076cf061d38aaa9e958a83fd51 get metrics writing to victoriametrics diff -r 41e36f98f3b8 -r cd1b8d7bda78 convert.py --- a/convert.py Fri Aug 09 18:36:39 2024 -0700 +++ b/convert.py Sat Aug 10 21:16:19 2024 -0700 @@ -1,3 +1,6 @@ +''' +Note that these functions need to parse the message['payload'] into a float +''' converters = [] @@ -22,7 +25,7 @@ 'labelName': 'size', 'labelValue': '10', }], - 'value': message['payload'], + 'value': float(message['payload']), } diff -r 41e36f98f3b8 -r cd1b8d7bda78 mqtt_metrics.py --- a/mqtt_metrics.py Fri Aug 09 18:36:39 2024 -0700 +++ b/mqtt_metrics.py Sat Aug 10 21:16:19 2024 -0700 @@ -1,3 +1,4 @@ +import textwrap import asyncio import json import logging @@ -16,9 +17,11 @@ from starlette_exporter import PrometheusMiddleware, handle_metrics from convert import converters +from victoriametrics_write import MetricsWriter logging.basicConfig(level=logging.INFO) log = logging.getLogger() +logging.getLogger('httpx').setLevel(logging.WARNING) debugListeners = WeakSet() @@ -56,26 +59,27 @@ return message -async def mqttTask(): +async def mqttTask(metrics: MetricsWriter): try: client = aiomqtt.Client('mqtt2.bigasterisk.com', identifier="mqtt-exporter") async with client: await client.subscribe('#') async for mqttMessage in client.messages: try: - onMqttMessage(mqttMessage) + onMqttMessage(metrics, mqttMessage) except Exception as e: - log.warning("mqtt message (topic %r) failed: %r", mqttMessage.topic.value, e) - + log.warning("mqtt message (topic %r) failed: %r", mqttMessage.topic.value, textwrap.shorten(repr(e), width=200)) except Exception: log.error("mqtt task failed", exc_info=True) os.abort() -def onMqttMessage(mqttMessage): + +def onMqttMessage(metrics, mqttMessage): message = simplifyMqttMessage(mqttMessage) metricEvent = tryConverters(message) if metricEvent: + metrics.write(message['t'], metricEvent) broadcastToDebugListeners({'message': message, 'metricEvent': metricEvent}) @@ -99,8 +103,9 @@ def main(): + metrics = MetricsWriter() loop = asyncio.new_event_loop() - app = Starlette(on_startup=[lambda: loop.create_task(mqttTask())], debug=True, routes=[ + app = Starlette(on_startup=[lambda: loop.create_task(mqttTask(metrics))], debug=True, routes=[ Route('/api/debugEvents', debugEvents), ]) app.add_middleware(PrometheusMiddleware, app_name='mqtt_metrics') diff -r 41e36f98f3b8 -r cd1b8d7bda78 pdm.lock --- a/pdm.lock Fri Aug 09 18:36:39 2024 -0700 +++ b/pdm.lock Sat Aug 10 21:16:19 2024 -0700 @@ -5,7 +5,7 @@ groups = ["default"] strategy = ["cross_platform", "inherit_metadata"] lock_version = "4.4.1" -content_hash = "sha256:89405b3666ce0bed38558b1364992f0904aaee8c528d03f15e3bd01e2b556953" +content_hash = "sha256:5591ac6c868192569633481cf9523511d8d2bc6641f0e88d7e06922b1e896ede" [[package]] name = "aiohappyeyeballs" @@ -162,6 +162,17 @@ ] [[package]] +name = "certifi" +version = "2024.7.4" +requires_python = ">=3.6" +summary = "Python package for providing Mozilla's CA Bundle." +groups = ["default"] +files = [ + {file = "certifi-2024.7.4-py3-none-any.whl", hash = "sha256:c198e21b1289c2ab85ee4e67bb4b4ef3ead0892059901a8d5b622f24a1101e90"}, + {file = "certifi-2024.7.4.tar.gz", hash = "sha256:5a1e7645bc0ec61a09e26c36f6106dd4cf40c6db3a1fb6352b0244e7fb057c7b"}, +] + +[[package]] name = "cffi" version = "1.17.0" requires_python = ">=3.8" @@ -327,6 +338,21 @@ ] [[package]] +name = "httpcore" +version = "1.0.5" +requires_python = ">=3.8" +summary = "A minimal low-level HTTP client." +groups = ["default"] +dependencies = [ + "certifi", + "h11<0.15,>=0.13", +] +files = [ + {file = "httpcore-1.0.5-py3-none-any.whl", hash = "sha256:421f18bac248b25d310f3cacd198d55b8e6125c107797b609ff9b7a6ba7991b5"}, + {file = "httpcore-1.0.5.tar.gz", hash = "sha256:34a38e2f9291467ee3b44e89dd52615370e152954ba21721378a87b2960f7a61"}, +] + +[[package]] name = "httptools" version = "0.6.1" requires_python = ">=3.8.0" @@ -351,6 +377,24 @@ ] [[package]] +name = "httpx" +version = "0.27.0" +requires_python = ">=3.8" +summary = "The next generation HTTP client." +groups = ["default"] +dependencies = [ + "anyio", + "certifi", + "httpcore==1.*", + "idna", + "sniffio", +] +files = [ + {file = "httpx-0.27.0-py3-none-any.whl", hash = "sha256:71d5465162c13681bff01ad59b2cc68dd838ea1f10e51574bac27103f00c91a5"}, + {file = "httpx-0.27.0.tar.gz", hash = "sha256:a0cb88a46f32dc874e04ee956e4c2764aba2aa228f650b06788ba6bda2962ab5"}, +] + +[[package]] name = "hyperlink" version = "21.0.0" requires_python = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" diff -r 41e36f98f3b8 -r cd1b8d7bda78 pyproject.toml --- a/pyproject.toml Fri Aug 09 18:36:39 2024 -0700 +++ b/pyproject.toml Sat Aug 10 21:16:19 2024 -0700 @@ -15,6 +15,7 @@ "rdfdb==0.24.0", "aiomqtt>=2.3.0", "sse-starlette>=2.1.3", + "httpx>=0.27.0", ] requires-python = ">=3.11" license = {text = "MIT"} diff -r 41e36f98f3b8 -r cd1b8d7bda78 victoriametrics_write.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/victoriametrics_write.py Sat Aug 10 21:16:19 2024 -0700 @@ -0,0 +1,26 @@ +import httpx +import logging +log = logging.getLogger() + +class MetricsWriter: + # todo: this could merge quick writes, keep the connection open, etc. + agentImportUrl = "http://victoriametrics-forever-vmagent/m/forever/vmagent/api/v1/import" + + def write(self, t, metricEvent): + # To see inserted data, try this: + # curl http://`khost victoriametrics-vmselect`/m/vmselect/select/0/prometheus/api/v1/export -d 'match[]=vm_http_request_errors_total' + labelDict = dict((x['labelName'], x['labelValue']) for x in metricEvent['labels']) + promBody = { + "metric": { + "__name__": metricEvent['name'] + } | labelDict, + "values": [metricEvent['value']], + "timestamps": [int(t * 1000)], + } + log.info(promBody) + req = httpx.post( + self.agentImportUrl, + json=promBody, + headers={'content-type': 'application/stream+json'}, + ) + req.raise_for_status()