Mercurial > code > home > repos > homeauto
diff lib/export_to_influxdb.py @ 1232:b50a13ef20ba
exportStats for sending scales data to influxdb
Ignore-this: 715ff40fed97559efa37edf0efa58220
darcs-hash:bcdd9713c4bf9c2ebe732ae1d30edc44f1be704f
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Sun, 07 Apr 2019 03:56:17 -0700 |
parents | e4f49cd9dda3 |
children |
line wrap: on
line diff
--- a/lib/export_to_influxdb.py Thu Apr 04 02:22:04 2019 -0700 +++ b/lib/export_to_influxdb.py Sun Apr 07 03:56:17 2019 -0700 @@ -1,21 +1,76 @@ -import time, logging +import time, logging, math, os, sys, socket from influxdb import InfluxDBClient from rdflib import Namespace +from twisted.internet import task log = logging.getLogger() ROOM = Namespace('http://projects.bigasterisk.com/room/') +class RetentionPolicies(object): + def __init__(self, influx): + self.influx = influx + self.createdPolicies = set() # days + + def getCreatedPolicy(self, days): + name = 'ret_%d_day' % days + if days not in self.createdPolicies: + self.influx.create_retention_policy(name, + duration='%dd' % days, + replication='1') + self.createdPolicies.add(days) + return name + class InfluxExporter(object): def __init__(self, configGraph, influxHost='bang6'): self.graph = configGraph self.influx = InfluxDBClient(influxHost, 9060, 'root', 'root', 'main') + self.retentionPolicies = RetentionPolicies(self.influx) + self.lastSent = {} self.lastExport = 0 - self.lastSent = {} # (subj, measurementName, tags) : (time, value) self.measurements = {} # (subj, predicate) : measurement for s, m in self.graph.subject_objects(ROOM['influxMeasurement']): self.measurements[(s, self.graph.value(m, ROOM['predicate']))] = m - + + def exportStats(self, stats, paths, period_secs=10, retain_days=7): + # graphite version of this in scales/graphite.py + base = ['stats', os.path.basename(sys.argv[0]).split('.py')[0]] + tags = {'host': socket.gethostname()} + def send(): + now = int(time.time()) + points = [] + def getVal(path): + x = stats + comps = path.split('.')[1:] + for comp in comps: + x2 = x + x = getattr(x, comp, None) + if x is None: + x = x2[comp] + if x is None: + print("no path %s" % path) + return + if math.isnan(x): + return + points.append({ + 'measurement': '.'.join(base + comps[:-1]), + "tags": tags, + "fields": {comps[-1]: x}, + "time": now + }) + for path in paths: + getVal(path) + if points: + self.influx.write_points( + points, time_precision='s', + retention_policy=self.retentionPolicies.getCreatedPolicy(days=retain_days)) + if self.lastExport == 0: + log.info('writing stats to %r', points) + self.lastExport = now + #print('send %r' % points) + + task.LoopingCall(send).start(period_secs, now=False) + def exportToInflux(self, currentStatements): graph = self.graph now = int(time.time())