Mercurial > code > home > repos > homeauto
changeset 1580:f440c731b1d5
reformat, switch to prometheus, maybe other fixes
author | drewp@bigasterisk.com |
---|---|
date | Sun, 29 Aug 2021 12:56:14 -0700 |
parents | f71c9ceb948e |
children | 137ba2d6d016 |
files | lib/export_to_influxdb/export_to_influxdb.py |
diffstat | 1 files changed, 33 insertions(+), 34 deletions(-) [+] |
line wrap: on
line diff
--- a/lib/export_to_influxdb/export_to_influxdb.py Thu Aug 26 18:26:20 2021 -0700 +++ b/lib/export_to_influxdb/export_to_influxdb.py Sun Aug 29 12:56:14 2021 -0700 @@ -1,51 +1,60 @@ -import time, logging, math, os, sys, socket +import logging +import math +import os +import socket +import sys +import time + from influxdb import InfluxDBClient +from prometheus_client import Counter, Gauge, Histogram, Summary from rdflib import Namespace from twisted.internet import task -from greplin import scales -log = logging.getLogger() +log = logging.getLogger('influxExport') ROOM = Namespace('http://projects.bigasterisk.com/room/') -stats = scales.collection( - '/export_to_influxdb', - scales.PmfStat('exportToInflux'), -) +EXPORT_TO_INFLUX = Summary("export_to_influx_calls", 'calls') class RetentionPolicies(object): + def __init__(self, influx): self.influx = influx - self.createdPolicies = set() # days + self.createdPolicies = set() # days def getCreatedPolicy(self, days): name = 'ret_%d_day' % days if days not in self.createdPolicies: - self.influx.create_retention_policy(name, - duration='%dd' % days, - replication='1') + self.influx.create_retention_policy(name, duration='%dd' % days, replication='1') self.createdPolicies.add(days) return name + class InfluxExporter(object): + def __init__(self, configGraph, influxHost='bang5'): self.graph = configGraph - self.influx = InfluxDBClient(influxHost, 9060, 'root', 'root', 'main') + self.influx = InfluxDBClient(influxHost, 9060) + self.influx.create_database('main') + self.influx.switch_database('main') + self.retentionPolicies = RetentionPolicies(self.influx) self.lastSent = {} self.lastExport = 0 - self.measurements = {} # (subj, predicate) : measurement + self.measurements = {} # (subj, predicate) : measurement for s, m in self.graph.subject_objects(ROOM['influxMeasurement']): self.measurements[(s, self.graph.value(m, ROOM['predicate']))] = m def exportStats(self, stats, paths, period_secs=10, retain_days=7): # graphite version of this in scales/graphite.py base = ['stats', os.path.basename(sys.argv[0]).split('.py')[0]] - tags = {'host': socket.gethostname()} + tags = {'host': socket.gethostname()} + def send(): now = int(time.time()) points = [] + def getVal(path): x = stats comps = path.split('.')[1:] @@ -59,26 +68,22 @@ return if math.isnan(x): return - points.append({ - 'measurement': '.'.join(base + comps[:-1]), - "tags": tags, - "fields": {comps[-1]: x}, - "time": now - }) + points.append({'measurement': '.'.join(base + comps[:-1]), "tags": tags, "fields": {comps[-1]: x}, "time": now}) + for path in paths: getVal(path) if points: - self.influx.write_points( - points, time_precision='s', - retention_policy=self.retentionPolicies.getCreatedPolicy(days=retain_days)) + self.influx.write_points(points, + time_precision='s', + retention_policy=self.retentionPolicies.getCreatedPolicy(days=retain_days)) if self.lastExport == 0: log.info('writing stats to %r', points) self.lastExport = now - #print('send %r' % points) + log.debug(f'sent points {points!r}') task.LoopingCall(send).start(period_secs, now=False) - @stats.exportToInflux.time() + @EXPORT_TO_INFLUX.time() def exportToInflux(self, currentStatements): """ looks for @@ -98,7 +103,7 @@ for stmt in currentStatements: if (stmt[0], stmt[1]) in self.measurements: meas = self.measurements[(stmt[0], stmt[1])] - measurementName = graph.value(meas, ROOM['measurement']) + measurementName = graph.value(meas, ROOM['measurement']).toPython() tags = {} for t in graph.objects(meas, ROOM['tag']): k = graph.value(t, ROOM['key']).toPython() @@ -109,16 +114,10 @@ if graph.value(meas, ROOM['pointsAtLeastEvery'], default=None): pale = graph.value(meas, ROOM['pointsAtLeastEvery']).toPython() - if not self.shouldSendNewPoint(now, stmt[0], measurementName, - tags, value, pointsAtLeastEvery=pale): + if not self.shouldSendNewPoint(now, stmt[0], measurementName, tags, value, pointsAtLeastEvery=pale): continue - points.append({ - 'measurement': measurementName, - "tags": tags, - "fields": {"value": value}, - "time": now - }) + points.append({'measurement': measurementName, "tags": tags, "fields": {"value": value}, "time": now}) log.debug('send to influx %r', points[-1]) if points: self.influx.write_points(points, time_precision='s')