annotate service/piNode/export_to_influxdb.py @ 293:fc0e42933baa

save data to influxdb, not graphite Ignore-this: ebf07e54d1949bdf3c2e9a81c5fc7292
author drewp@bigasterisk.com
date Mon, 01 Aug 2016 02:26:38 -0700
parents
children e7cbf250188a
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
293
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
1 import time, logging
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
2 from influxdb import InfluxDBClient
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
3 from rdflib import Namespace
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
4
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
5 log = logging.getLogger()
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
6 ROOM = Namespace('http://projects.bigasterisk.com/room/')
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
7
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
8 class InfluxExporter(object):
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
9 def __init__(self, configGraph, influxHost='bang6'):
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
10 self.graph = configGraph
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
11 self.influx = InfluxDBClient(influxHost, 9060, 'root', 'root', 'main')
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
12 self.lastExport = 0
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
13 self.lastSent = {} # (subj, measurementName, tags) : (time, value)
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
14
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
15 self.measurements = {} # (subj, predicate) : measurement
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
16 for s, m in self.graph.subject_objects(ROOM['influxMeasurement']):
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
17 self.measurements[(s, self.graph.value(m, ROOM['predicate']))] = m
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
18
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
19 def exportToInflux(self, currentStatements):
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
20 graph = self.graph
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
21 now = int(time.time())
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
22
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
23 log.debug('influxdb export:')
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
24
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
25 points = []
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
26 for stmt in currentStatements:
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
27 if (stmt[0], stmt[1]) in self.measurements:
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
28 meas = self.measurements[(stmt[0], stmt[1])]
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
29 measurementName = graph.value(meas, ROOM['measurement'])
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
30 tags = {}
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
31 for t in graph.objects(meas, ROOM['tag']):
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
32 k = graph.value(t, ROOM['key']).toPython()
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
33 tags[k] = graph.value(t, ROOM['value']).toPython()
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
34
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
35 value = self.influxValue(stmt[2])
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
36
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
37 if not self.shouldSendNewPoint(now, stmt[0], measurementName,
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
38 tags, value):
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
39 continue
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
40
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
41 points.append({
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
42 'measurement': measurementName,
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
43 "tags": tags,
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
44 "fields": {"value": value},
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
45 "time": now
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
46 })
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
47 log.debug('send to influx %r', points[-1])
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
48 if points:
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
49 self.influx.write_points(points, time_precision='s')
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
50
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
51 def influxValue(self, rdfValue):
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
52 if rdfValue == ROOM['motion']:
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
53 value = 1
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
54 elif rdfValue == ROOM['noMotion']:
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
55 value = 0
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
56 else:
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
57 value = rdfValue.toPython()
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
58 if not isinstance(value, (int, float)):
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
59 raise NotImplementedError('value=%r' % value)
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
60 return value
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
61
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
62 def shouldSendNewPoint(self, now, subj, measurementName, tags, value):
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
63 key = (subj, measurementName, tuple(sorted(tags.items())))
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
64 if key in self.lastSent:
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
65 lastTime, lastValue = self.lastSent[key]
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
66 if lastValue == value and lastTime > now - 3600:
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
67 log.debug('skip influx point %r', key)
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
68 return False
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
69
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
70 self.lastSent[key] = (now, value)
fc0e42933baa save data to influxdb, not graphite
drewp@bigasterisk.com
parents:
diff changeset
71 return True