Mercurial > code > home > repos > homeauto
changeset 1142:eb36b30f53b9
move export_to_influxdb up to lib
Ignore-this: d43cc183b51bec93b28e6a7ba7bf1d6e
darcs-hash:13dd7219f8eb69b32ce9ac7b93d0ff20fcf93e22
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Sat, 03 Mar 2018 18:08:03 -0800 |
parents | 34fa399dc5c9 |
children | d1bc88f67969 |
files | lib/export_to_influxdb.py service/piNode/export_to_influxdb.py |
diffstat | 2 files changed, 69 insertions(+), 69 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/lib/export_to_influxdb.py Sat Mar 03 18:08:03 2018 -0800 @@ -0,0 +1,69 @@ +import time, logging +from influxdb import InfluxDBClient +from rdflib import Namespace + +log = logging.getLogger() +ROOM = Namespace('http://projects.bigasterisk.com/room/') + +class InfluxExporter(object): + def __init__(self, configGraph, influxHost='bang6'): + self.graph = configGraph + self.influx = InfluxDBClient(influxHost, 9060, 'root', 'root', 'main') + self.lastExport = 0 + self.lastSent = {} # (subj, measurementName, tags) : (time, value) + + self.measurements = {} # (subj, predicate) : measurement + for s, m in self.graph.subject_objects(ROOM['influxMeasurement']): + self.measurements[(s, self.graph.value(m, ROOM['predicate']))] = m + + def exportToInflux(self, currentStatements): + graph = self.graph + now = int(time.time()) + + points = [] + for stmt in currentStatements: + if (stmt[0], stmt[1]) in self.measurements: + meas = self.measurements[(stmt[0], stmt[1])] + measurementName = graph.value(meas, ROOM['measurement']) + tags = {} + for t in graph.objects(meas, ROOM['tag']): + k = graph.value(t, ROOM['key']).toPython() + tags[k] = graph.value(t, ROOM['value']).toPython() + + value = self.influxValue(stmt[2]) + + if not self.shouldSendNewPoint(now, stmt[0], measurementName, + tags, value): + continue + + points.append({ + 'measurement': measurementName, + "tags": tags, + "fields": {"value": value}, + "time": now + }) + log.debug('send to influx %r', points[-1]) + if points: + self.influx.write_points(points, time_precision='s') + + def influxValue(self, rdfValue): + if rdfValue in [ROOM['motion'], ROOM['pressed']]: + value = 1 + elif rdfValue in [ROOM['noMotion'], ROOM['notPressed']]: + value = 0 + else: + value = rdfValue.toPython() + if not isinstance(value, (int, float)): + raise NotImplementedError('value=%r' % value) + return value + + def shouldSendNewPoint(self, now, subj, measurementName, tags, value): + key = (subj, measurementName, tuple(sorted(tags.items()))) + if key in self.lastSent: + lastTime, lastValue = self.lastSent[key] + if lastValue == value and lastTime > now - 3600: + log.debug('skip influx point %r', key) + return False + + self.lastSent[key] = (now, value) + return True
--- a/service/piNode/export_to_influxdb.py Sat Mar 03 18:01:02 2018 -0800 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,69 +0,0 @@ -import time, logging -from influxdb import InfluxDBClient -from rdflib import Namespace - -log = logging.getLogger() -ROOM = Namespace('http://projects.bigasterisk.com/room/') - -class InfluxExporter(object): - def __init__(self, configGraph, influxHost='bang6'): - self.graph = configGraph - self.influx = InfluxDBClient(influxHost, 9060, 'root', 'root', 'main') - self.lastExport = 0 - self.lastSent = {} # (subj, measurementName, tags) : (time, value) - - self.measurements = {} # (subj, predicate) : measurement - for s, m in self.graph.subject_objects(ROOM['influxMeasurement']): - self.measurements[(s, self.graph.value(m, ROOM['predicate']))] = m - - def exportToInflux(self, currentStatements): - graph = self.graph - now = int(time.time()) - - points = [] - for stmt in currentStatements: - if (stmt[0], stmt[1]) in self.measurements: - meas = self.measurements[(stmt[0], stmt[1])] - measurementName = graph.value(meas, ROOM['measurement']) - tags = {} - for t in graph.objects(meas, ROOM['tag']): - k = graph.value(t, ROOM['key']).toPython() - tags[k] = graph.value(t, ROOM['value']).toPython() - - value = self.influxValue(stmt[2]) - - if not self.shouldSendNewPoint(now, stmt[0], measurementName, - tags, value): - continue - - points.append({ - 'measurement': measurementName, - "tags": tags, - "fields": {"value": value}, - "time": now - }) - log.debug('send to influx %r', points[-1]) - if points: - self.influx.write_points(points, time_precision='s') - - def influxValue(self, rdfValue): - if rdfValue in [ROOM['motion'], ROOM['pressed']]: - value = 1 - elif rdfValue in [ROOM['noMotion'], ROOM['notPressed']]: - value = 0 - else: - value = rdfValue.toPython() - if not isinstance(value, (int, float)): - raise NotImplementedError('value=%r' % value) - return value - - def shouldSendNewPoint(self, now, subj, measurementName, tags, value): - key = (subj, measurementName, tuple(sorted(tags.items()))) - if key in self.lastSent: - lastTime, lastValue = self.lastSent[key] - if lastValue == value and lastTime > now - 3600: - log.debug('skip influx point %r', key) - return False - - self.lastSent[key] = (now, value) - return True