Mercurial > code > home > repos > homeauto
view lib/export_to_influxdb.py @ 429:fcd2c026f51e
exportStats for sending scales data to influxdb
Ignore-this: 715ff40fed97559efa37edf0efa58220
author | drewp@bigasterisk.com |
---|---|
date | Sun, 07 Apr 2019 03:56:17 -0700 |
parents | bf2174646809 |
children |
line wrap: on
line source
import time, logging, math, os, sys, socket from influxdb import InfluxDBClient from rdflib import Namespace from twisted.internet import task log = logging.getLogger() ROOM = Namespace('http://projects.bigasterisk.com/room/') class RetentionPolicies(object): def __init__(self, influx): self.influx = influx self.createdPolicies = set() # days def getCreatedPolicy(self, days): name = 'ret_%d_day' % days if days not in self.createdPolicies: self.influx.create_retention_policy(name, duration='%dd' % days, replication='1') self.createdPolicies.add(days) return name class InfluxExporter(object): def __init__(self, configGraph, influxHost='bang6'): self.graph = configGraph self.influx = InfluxDBClient(influxHost, 9060, 'root', 'root', 'main') self.retentionPolicies = RetentionPolicies(self.influx) self.lastSent = {} self.lastExport = 0 self.measurements = {} # (subj, predicate) : measurement for s, m in self.graph.subject_objects(ROOM['influxMeasurement']): self.measurements[(s, self.graph.value(m, ROOM['predicate']))] = m def exportStats(self, stats, paths, period_secs=10, retain_days=7): # graphite version of this in scales/graphite.py base = ['stats', os.path.basename(sys.argv[0]).split('.py')[0]] tags = {'host': socket.gethostname()} def send(): now = int(time.time()) points = [] def getVal(path): x = stats comps = path.split('.')[1:] for comp in comps: x2 = x x = getattr(x, comp, None) if x is None: x = x2[comp] if x is None: print("no path %s" % path) return if math.isnan(x): return points.append({ 'measurement': '.'.join(base + comps[:-1]), "tags": tags, "fields": {comps[-1]: x}, "time": now }) for path in paths: getVal(path) if points: self.influx.write_points( points, time_precision='s', retention_policy=self.retentionPolicies.getCreatedPolicy(days=retain_days)) if self.lastExport == 0: log.info('writing stats to %r', points) self.lastExport = now #print('send %r' % points) task.LoopingCall(send).start(period_secs, now=False) def exportToInflux(self, currentStatements): graph = self.graph now = int(time.time()) points = [] for stmt in currentStatements: if (stmt[0], stmt[1]) in self.measurements: meas = self.measurements[(stmt[0], stmt[1])] measurementName = graph.value(meas, ROOM['measurement']) tags = {} for t in graph.objects(meas, ROOM['tag']): k = graph.value(t, ROOM['key']).toPython() tags[k] = graph.value(t, ROOM['value']).toPython() value = self.influxValue(stmt[2]) pale = 3600 if graph.value(meas, ROOM['pointsAtLeastEvery'], default=None): pale = graph.value(meas, ROOM['pointsAtLeastEvery']).toPython() if not self.shouldSendNewPoint(now, stmt[0], measurementName, tags, value, pointsAtLeastEvery=pale): continue points.append({ 'measurement': measurementName, "tags": tags, "fields": {"value": value}, "time": now }) log.debug('send to influx %r', points[-1]) if points: self.influx.write_points(points, time_precision='s') def influxValue(self, rdfValue): if rdfValue in [ROOM['motion'], ROOM['pressed']]: value = 1 elif rdfValue in [ROOM['noMotion'], ROOM['notPressed']]: value = 0 else: value = rdfValue.toPython() if not isinstance(value, (int, float)): raise NotImplementedError('value=%r' % value) return value def shouldSendNewPoint(self, now, subj, measurementName, tags, value, pointsAtLeastEvery): key = (subj, measurementName, tuple(sorted(tags.items()))) if key in self.lastSent: lastTime, lastValue = self.lastSent[key] if lastValue == value and lastTime > now - pointsAtLeastEvery: log.debug('skip influx point %r', key) return False self.lastSent[key] = (now, value) return True