changeset 1580:f440c731b1d5

reformat, switch to prometheus, maybe other fixes
author drewp@bigasterisk.com
date Sun, 29 Aug 2021 12:56:14 -0700
parents f71c9ceb948e
children 137ba2d6d016
files lib/export_to_influxdb/export_to_influxdb.py
diffstat 1 files changed, 33 insertions(+), 34 deletions(-) [+]
line wrap: on
line diff
--- a/lib/export_to_influxdb/export_to_influxdb.py	Thu Aug 26 18:26:20 2021 -0700
+++ b/lib/export_to_influxdb/export_to_influxdb.py	Sun Aug 29 12:56:14 2021 -0700
@@ -1,51 +1,60 @@
-import time, logging, math, os, sys, socket
+import logging
+import math
+import os
+import socket
+import sys
+import time
+
 from influxdb import InfluxDBClient
+from prometheus_client import Counter, Gauge, Histogram, Summary
 from rdflib import Namespace
 from twisted.internet import task
-from greplin import scales
 
-log = logging.getLogger()
+log = logging.getLogger('influxExport')
 ROOM = Namespace('http://projects.bigasterisk.com/room/')
 
-stats = scales.collection(
-    '/export_to_influxdb',
-    scales.PmfStat('exportToInflux'),
-)
+EXPORT_TO_INFLUX = Summary("export_to_influx_calls", 'calls')
 
 
 class RetentionPolicies(object):
+
     def __init__(self, influx):
         self.influx = influx
-        self.createdPolicies = set() # days
+        self.createdPolicies = set()  # days
 
     def getCreatedPolicy(self, days):
         name = 'ret_%d_day' % days
         if days not in self.createdPolicies:
-            self.influx.create_retention_policy(name,
-                                                duration='%dd' % days,
-                                                replication='1')
+            self.influx.create_retention_policy(name, duration='%dd' % days, replication='1')
             self.createdPolicies.add(days)
         return name
 
+
 class InfluxExporter(object):
+
     def __init__(self, configGraph, influxHost='bang5'):
         self.graph = configGraph
-        self.influx = InfluxDBClient(influxHost, 9060, 'root', 'root', 'main')
+        self.influx = InfluxDBClient(influxHost, 9060)
+        self.influx.create_database('main')
+        self.influx.switch_database('main')
+
         self.retentionPolicies = RetentionPolicies(self.influx)
         self.lastSent = {}
         self.lastExport = 0
 
-        self.measurements = {} # (subj, predicate) : measurement
+        self.measurements = {}  # (subj, predicate) : measurement
         for s, m in self.graph.subject_objects(ROOM['influxMeasurement']):
             self.measurements[(s, self.graph.value(m, ROOM['predicate']))] = m
 
     def exportStats(self, stats, paths, period_secs=10, retain_days=7):
         # graphite version of this in scales/graphite.py
         base = ['stats', os.path.basename(sys.argv[0]).split('.py')[0]]
-        tags  = {'host': socket.gethostname()}
+        tags = {'host': socket.gethostname()}
+
         def send():
             now = int(time.time())
             points = []
+
             def getVal(path):
                 x = stats
                 comps = path.split('.')[1:]
@@ -59,26 +68,22 @@
                             return
                 if math.isnan(x):
                     return
-                points.append({
-                    'measurement': '.'.join(base + comps[:-1]),
-                    "tags": tags,
-                    "fields": {comps[-1]: x},
-                    "time": now
-                })
+                points.append({'measurement': '.'.join(base + comps[:-1]), "tags": tags, "fields": {comps[-1]: x}, "time": now})
+
             for path in paths:
                 getVal(path)
             if points:
-                self.influx.write_points(
-                    points, time_precision='s',
-                    retention_policy=self.retentionPolicies.getCreatedPolicy(days=retain_days))
+                self.influx.write_points(points,
+                                         time_precision='s',
+                                         retention_policy=self.retentionPolicies.getCreatedPolicy(days=retain_days))
                 if self.lastExport == 0:
                     log.info('writing stats to %r', points)
                 self.lastExport = now
-                #print('send %r' % points)
+                log.debug(f'sent points {points!r}')
 
         task.LoopingCall(send).start(period_secs, now=False)
 
-    @stats.exportToInflux.time()
+    @EXPORT_TO_INFLUX.time()
     def exportToInflux(self, currentStatements):
         """
         looks for
@@ -98,7 +103,7 @@
         for stmt in currentStatements:
             if (stmt[0], stmt[1]) in self.measurements:
                 meas = self.measurements[(stmt[0], stmt[1])]
-                measurementName = graph.value(meas, ROOM['measurement'])
+                measurementName = graph.value(meas, ROOM['measurement']).toPython()
                 tags = {}
                 for t in graph.objects(meas, ROOM['tag']):
                     k = graph.value(t, ROOM['key']).toPython()
@@ -109,16 +114,10 @@
                 if graph.value(meas, ROOM['pointsAtLeastEvery'], default=None):
                     pale = graph.value(meas, ROOM['pointsAtLeastEvery']).toPython()
 
-                if not self.shouldSendNewPoint(now, stmt[0], measurementName,
-                                               tags, value, pointsAtLeastEvery=pale):
+                if not self.shouldSendNewPoint(now, stmt[0], measurementName, tags, value, pointsAtLeastEvery=pale):
                     continue
 
-                points.append({
-                    'measurement': measurementName,
-                    "tags": tags,
-                    "fields": {"value": value},
-                    "time": now
-                })
+                points.append({'measurement': measurementName, "tags": tags, "fields": {"value": value}, "time": now})
                 log.debug('send to influx %r', points[-1])
         if points:
             self.influx.write_points(points, time_precision='s')