Mercurial > code > home > repos > homeauto
annotate lib/export_to_influxdb.py @ 429:fcd2c026f51e
exportStats for sending scales data to influxdb
Ignore-this: 715ff40fed97559efa37edf0efa58220
author | drewp@bigasterisk.com |
---|---|
date | Sun, 07 Apr 2019 03:56:17 -0700 |
parents | bf2174646809 |
children |
rev | line source |
---|---|
429
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
1 import time, logging, math, os, sys, socket |
293 | 2 from influxdb import InfluxDBClient |
3 from rdflib import Namespace | |
429
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
4 from twisted.internet import task |
293 | 5 |
6 log = logging.getLogger() | |
7 ROOM = Namespace('http://projects.bigasterisk.com/room/') | |
8 | |
429
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
9 class RetentionPolicies(object): |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
10 def __init__(self, influx): |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
11 self.influx = influx |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
12 self.createdPolicies = set() # days |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
13 |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
14 def getCreatedPolicy(self, days): |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
15 name = 'ret_%d_day' % days |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
16 if days not in self.createdPolicies: |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
17 self.influx.create_retention_policy(name, |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
18 duration='%dd' % days, |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
19 replication='1') |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
20 self.createdPolicies.add(days) |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
21 return name |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
22 |
293 | 23 class InfluxExporter(object): |
24 def __init__(self, configGraph, influxHost='bang6'): | |
25 self.graph = configGraph | |
26 self.influx = InfluxDBClient(influxHost, 9060, 'root', 'root', 'main') | |
429
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
27 self.retentionPolicies = RetentionPolicies(self.influx) |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
28 self.lastSent = {} |
293 | 29 self.lastExport = 0 |
30 | |
31 self.measurements = {} # (subj, predicate) : measurement | |
32 for s, m in self.graph.subject_objects(ROOM['influxMeasurement']): | |
33 self.measurements[(s, self.graph.value(m, ROOM['predicate']))] = m | |
429
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
34 |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
35 def exportStats(self, stats, paths, period_secs=10, retain_days=7): |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
36 # graphite version of this in scales/graphite.py |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
37 base = ['stats', os.path.basename(sys.argv[0]).split('.py')[0]] |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
38 tags = {'host': socket.gethostname()} |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
39 def send(): |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
40 now = int(time.time()) |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
41 points = [] |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
42 def getVal(path): |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
43 x = stats |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
44 comps = path.split('.')[1:] |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
45 for comp in comps: |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
46 x2 = x |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
47 x = getattr(x, comp, None) |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
48 if x is None: |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
49 x = x2[comp] |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
50 if x is None: |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
51 print("no path %s" % path) |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
52 return |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
53 if math.isnan(x): |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
54 return |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
55 points.append({ |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
56 'measurement': '.'.join(base + comps[:-1]), |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
57 "tags": tags, |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
58 "fields": {comps[-1]: x}, |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
59 "time": now |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
60 }) |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
61 for path in paths: |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
62 getVal(path) |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
63 if points: |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
64 self.influx.write_points( |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
65 points, time_precision='s', |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
66 retention_policy=self.retentionPolicies.getCreatedPolicy(days=retain_days)) |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
67 if self.lastExport == 0: |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
68 log.info('writing stats to %r', points) |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
69 self.lastExport = now |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
70 #print('send %r' % points) |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
71 |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
72 task.LoopingCall(send).start(period_secs, now=False) |
fcd2c026f51e
exportStats for sending scales data to influxdb
drewp@bigasterisk.com
parents:
348
diff
changeset
|
73 |
293 | 74 def exportToInflux(self, currentStatements): |
75 graph = self.graph | |
76 now = int(time.time()) | |
77 | |
78 points = [] | |
79 for stmt in currentStatements: | |
80 if (stmt[0], stmt[1]) in self.measurements: | |
81 meas = self.measurements[(stmt[0], stmt[1])] | |
82 measurementName = graph.value(meas, ROOM['measurement']) | |
83 tags = {} | |
84 for t in graph.objects(meas, ROOM['tag']): | |
85 k = graph.value(t, ROOM['key']).toPython() | |
86 tags[k] = graph.value(t, ROOM['value']).toPython() | |
87 | |
88 value = self.influxValue(stmt[2]) | |
348 | 89 pale = 3600 |
90 if graph.value(meas, ROOM['pointsAtLeastEvery'], default=None): | |
91 pale = graph.value(meas, ROOM['pointsAtLeastEvery']).toPython() | |
293 | 92 |
93 if not self.shouldSendNewPoint(now, stmt[0], measurementName, | |
348 | 94 tags, value, pointsAtLeastEvery=pale): |
293 | 95 continue |
96 | |
97 points.append({ | |
98 'measurement': measurementName, | |
99 "tags": tags, | |
100 "fields": {"value": value}, | |
101 "time": now | |
102 }) | |
103 log.debug('send to influx %r', points[-1]) | |
104 if points: | |
105 self.influx.write_points(points, time_precision='s') | |
106 | |
107 def influxValue(self, rdfValue): | |
304
e7cbf250188a
influx output, fade support, switch to Adafruit_DHT, start of Lcd8544
drewp@bigasterisk.com
parents:
293
diff
changeset
|
108 if rdfValue in [ROOM['motion'], ROOM['pressed']]: |
293 | 109 value = 1 |
304
e7cbf250188a
influx output, fade support, switch to Adafruit_DHT, start of Lcd8544
drewp@bigasterisk.com
parents:
293
diff
changeset
|
110 elif rdfValue in [ROOM['noMotion'], ROOM['notPressed']]: |
293 | 111 value = 0 |
112 else: | |
113 value = rdfValue.toPython() | |
114 if not isinstance(value, (int, float)): | |
115 raise NotImplementedError('value=%r' % value) | |
116 return value | |
117 | |
348 | 118 def shouldSendNewPoint(self, now, subj, measurementName, tags, value, pointsAtLeastEvery): |
293 | 119 key = (subj, measurementName, tuple(sorted(tags.items()))) |
120 if key in self.lastSent: | |
121 lastTime, lastValue = self.lastSent[key] | |
348 | 122 if lastValue == value and lastTime > now - pointsAtLeastEvery: |
293 | 123 log.debug('skip influx point %r', key) |
124 return False | |
125 | |
126 self.lastSent[key] = (now, value) | |
127 return True |