# HG changeset patch # User drewp # Date 1556259159 25200 # Node ID 77f759551fe512516a396f15b3f633726551fbe6 # Parent e681221ab8a345906e3fda15f6f6040a4b6086dd export_to_influxdb to new package Ignore-this: 573ebd40dcc63350bce2bf573f8908c1 darcs-hash:12c99cba3724d1a77a723def79bd54b193d831e2 diff -r e681221ab8a3 -r 77f759551fe5 lib/export_to_influxdb.py --- a/lib/export_to_influxdb.py Thu Apr 25 23:01:26 2019 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,127 +0,0 @@ -import time, logging, math, os, sys, socket -from influxdb import InfluxDBClient -from rdflib import Namespace -from twisted.internet import task - -log = logging.getLogger() -ROOM = Namespace('http://projects.bigasterisk.com/room/') - -class RetentionPolicies(object): - def __init__(self, influx): - self.influx = influx - self.createdPolicies = set() # days - - def getCreatedPolicy(self, days): - name = 'ret_%d_day' % days - if days not in self.createdPolicies: - self.influx.create_retention_policy(name, - duration='%dd' % days, - replication='1') - self.createdPolicies.add(days) - return name - -class InfluxExporter(object): - def __init__(self, configGraph, influxHost='bang6'): - self.graph = configGraph - self.influx = InfluxDBClient(influxHost, 9060, 'root', 'root', 'main') - self.retentionPolicies = RetentionPolicies(self.influx) - self.lastSent = {} - self.lastExport = 0 - - self.measurements = {} # (subj, predicate) : measurement - for s, m in self.graph.subject_objects(ROOM['influxMeasurement']): - self.measurements[(s, self.graph.value(m, ROOM['predicate']))] = m - - def exportStats(self, stats, paths, period_secs=10, retain_days=7): - # graphite version of this in scales/graphite.py - base = ['stats', os.path.basename(sys.argv[0]).split('.py')[0]] - tags = {'host': socket.gethostname()} - def send(): - now = int(time.time()) - points = [] - def getVal(path): - x = stats - comps = path.split('.')[1:] - for comp in comps: - x2 = x - x = getattr(x, comp, None) - if x is None: - x = x2[comp] - if x is None: - print("no path %s" % path) - return - if math.isnan(x): - return - points.append({ - 'measurement': '.'.join(base + comps[:-1]), - "tags": tags, - "fields": {comps[-1]: x}, - "time": now - }) - for path in paths: - getVal(path) - if points: - self.influx.write_points( - points, time_precision='s', - retention_policy=self.retentionPolicies.getCreatedPolicy(days=retain_days)) - if self.lastExport == 0: - log.info('writing stats to %r', points) - self.lastExport = now - #print('send %r' % points) - - task.LoopingCall(send).start(period_secs, now=False) - - def exportToInflux(self, currentStatements): - graph = self.graph - now = int(time.time()) - - points = [] - for stmt in currentStatements: - if (stmt[0], stmt[1]) in self.measurements: - meas = self.measurements[(stmt[0], stmt[1])] - measurementName = graph.value(meas, ROOM['measurement']) - tags = {} - for t in graph.objects(meas, ROOM['tag']): - k = graph.value(t, ROOM['key']).toPython() - tags[k] = graph.value(t, ROOM['value']).toPython() - - value = self.influxValue(stmt[2]) - pale = 3600 - if graph.value(meas, ROOM['pointsAtLeastEvery'], default=None): - pale = graph.value(meas, ROOM['pointsAtLeastEvery']).toPython() - - if not self.shouldSendNewPoint(now, stmt[0], measurementName, - tags, value, pointsAtLeastEvery=pale): - continue - - points.append({ - 'measurement': measurementName, - "tags": tags, - "fields": {"value": value}, - "time": now - }) - log.debug('send to influx %r', points[-1]) - if points: - self.influx.write_points(points, time_precision='s') - - def influxValue(self, rdfValue): - if rdfValue in [ROOM['motion'], ROOM['pressed']]: - value = 1 - elif rdfValue in [ROOM['noMotion'], ROOM['notPressed']]: - value = 0 - else: - value = rdfValue.toPython() - if not isinstance(value, (int, float)): - raise NotImplementedError('value=%r' % value) - return value - - def shouldSendNewPoint(self, now, subj, measurementName, tags, value, pointsAtLeastEvery): - key = (subj, measurementName, tuple(sorted(tags.items()))) - if key in self.lastSent: - lastTime, lastValue = self.lastSent[key] - if lastValue == value and lastTime > now - pointsAtLeastEvery: - log.debug('skip influx point %r', key) - return False - - self.lastSent[key] = (now, value) - return True diff -r e681221ab8a3 -r 77f759551fe5 lib/export_to_influxdb/__init__.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/export_to_influxdb/__init__.py Thu Apr 25 23:12:39 2019 -0700 @@ -0,0 +1,1 @@ +from .export_to_influxdb import * diff -r e681221ab8a3 -r 77f759551fe5 lib/export_to_influxdb/export_to_influxdb.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/export_to_influxdb/export_to_influxdb.py Thu Apr 25 23:12:39 2019 -0700 @@ -0,0 +1,127 @@ +import time, logging, math, os, sys, socket +from influxdb import InfluxDBClient +from rdflib import Namespace +from twisted.internet import task + +log = logging.getLogger() +ROOM = Namespace('http://projects.bigasterisk.com/room/') + +class RetentionPolicies(object): + def __init__(self, influx): + self.influx = influx + self.createdPolicies = set() # days + + def getCreatedPolicy(self, days): + name = 'ret_%d_day' % days + if days not in self.createdPolicies: + self.influx.create_retention_policy(name, + duration='%dd' % days, + replication='1') + self.createdPolicies.add(days) + return name + +class InfluxExporter(object): + def __init__(self, configGraph, influxHost='bang6'): + self.graph = configGraph + self.influx = InfluxDBClient(influxHost, 9060, 'root', 'root', 'main') + self.retentionPolicies = RetentionPolicies(self.influx) + self.lastSent = {} + self.lastExport = 0 + + self.measurements = {} # (subj, predicate) : measurement + for s, m in self.graph.subject_objects(ROOM['influxMeasurement']): + self.measurements[(s, self.graph.value(m, ROOM['predicate']))] = m + + def exportStats(self, stats, paths, period_secs=10, retain_days=7): + # graphite version of this in scales/graphite.py + base = ['stats', os.path.basename(sys.argv[0]).split('.py')[0]] + tags = {'host': socket.gethostname()} + def send(): + now = int(time.time()) + points = [] + def getVal(path): + x = stats + comps = path.split('.')[1:] + for comp in comps: + x2 = x + x = getattr(x, comp, None) + if x is None: + x = x2[comp] + if x is None: + print("no path %s" % path) + return + if math.isnan(x): + return + points.append({ + 'measurement': '.'.join(base + comps[:-1]), + "tags": tags, + "fields": {comps[-1]: x}, + "time": now + }) + for path in paths: + getVal(path) + if points: + self.influx.write_points( + points, time_precision='s', + retention_policy=self.retentionPolicies.getCreatedPolicy(days=retain_days)) + if self.lastExport == 0: + log.info('writing stats to %r', points) + self.lastExport = now + #print('send %r' % points) + + task.LoopingCall(send).start(period_secs, now=False) + + def exportToInflux(self, currentStatements): + graph = self.graph + now = int(time.time()) + + points = [] + for stmt in currentStatements: + if (stmt[0], stmt[1]) in self.measurements: + meas = self.measurements[(stmt[0], stmt[1])] + measurementName = graph.value(meas, ROOM['measurement']) + tags = {} + for t in graph.objects(meas, ROOM['tag']): + k = graph.value(t, ROOM['key']).toPython() + tags[k] = graph.value(t, ROOM['value']).toPython() + + value = self.influxValue(stmt[2]) + pale = 3600 + if graph.value(meas, ROOM['pointsAtLeastEvery'], default=None): + pale = graph.value(meas, ROOM['pointsAtLeastEvery']).toPython() + + if not self.shouldSendNewPoint(now, stmt[0], measurementName, + tags, value, pointsAtLeastEvery=pale): + continue + + points.append({ + 'measurement': measurementName, + "tags": tags, + "fields": {"value": value}, + "time": now + }) + log.debug('send to influx %r', points[-1]) + if points: + self.influx.write_points(points, time_precision='s') + + def influxValue(self, rdfValue): + if rdfValue in [ROOM['motion'], ROOM['pressed']]: + value = 1 + elif rdfValue in [ROOM['noMotion'], ROOM['notPressed']]: + value = 0 + else: + value = rdfValue.toPython() + if not isinstance(value, (int, float)): + raise NotImplementedError('value=%r' % value) + return value + + def shouldSendNewPoint(self, now, subj, measurementName, tags, value, pointsAtLeastEvery): + key = (subj, measurementName, tuple(sorted(tags.items()))) + if key in self.lastSent: + lastTime, lastValue = self.lastSent[key] + if lastValue == value and lastTime > now - pointsAtLeastEvery: + log.debug('skip influx point %r', key) + return False + + self.lastSent[key] = (now, value) + return True diff -r e681221ab8a3 -r 77f759551fe5 lib/export_to_influxdb/setup.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/export_to_influxdb/setup.py Thu Apr 25 23:12:39 2019 -0700 @@ -0,0 +1,11 @@ +from setuptools import setup + +setup( + name='export_to_influxdb', + version='0.0.0', + packages=['export_to_influxdb'], + package_dir={'export_to_influxdb': ''}, + url='https://projects.bigasterisk.com/export-to-influxdb/export_to_influxdb-0.0.0.tar.gz', + author='Drew Perttula', + author_email='drewp@bigasterisk.com', +) diff -r e681221ab8a3 -r 77f759551fe5 lib/export_to_influxdb/tasks.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/export_to_influxdb/tasks.py Thu Apr 25 23:12:39 2019 -0700 @@ -0,0 +1,9 @@ +from invoke import task + +import sys +sys.path.append('/my/proj/release') +from release import local_release + +@task +def release(ctx): + local_release(ctx)