Mercurial > code > home > repos > homeauto
annotate lib/export_to_influxdb.py @ 1341:08dc3baa7d14
release 0.2.0
Ignore-this: 720a712599a00ca21e1a8a97a7c03a34
darcs-hash:c45119596b434bd7ffa2b276f1ec1de0095ecb28
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Thu, 25 Apr 2019 22:17:10 -0700 |
parents | b50a13ef20ba |
children |
rev | line source |
---|---|
1232
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
1 import time, logging, math, os, sys, socket |
1142
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
2 from influxdb import InfluxDBClient |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
3 from rdflib import Namespace |
1232
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
4 from twisted.internet import task |
1142
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
5 |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
6 log = logging.getLogger() |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
7 ROOM = Namespace('http://projects.bigasterisk.com/room/') |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
8 |
1232
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
9 class RetentionPolicies(object): |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
10 def __init__(self, influx): |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
11 self.influx = influx |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
12 self.createdPolicies = set() # days |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
13 |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
14 def getCreatedPolicy(self, days): |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
15 name = 'ret_%d_day' % days |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
16 if days not in self.createdPolicies: |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
17 self.influx.create_retention_policy(name, |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
18 duration='%dd' % days, |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
19 replication='1') |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
20 self.createdPolicies.add(days) |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
21 return name |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
22 |
1142
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
23 class InfluxExporter(object): |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
24 def __init__(self, configGraph, influxHost='bang6'): |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
25 self.graph = configGraph |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
26 self.influx = InfluxDBClient(influxHost, 9060, 'root', 'root', 'main') |
1232
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
27 self.retentionPolicies = RetentionPolicies(self.influx) |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
28 self.lastSent = {} |
1142
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
29 self.lastExport = 0 |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
30 |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
31 self.measurements = {} # (subj, predicate) : measurement |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
32 for s, m in self.graph.subject_objects(ROOM['influxMeasurement']): |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
33 self.measurements[(s, self.graph.value(m, ROOM['predicate']))] = m |
1232
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
34 |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
35 def exportStats(self, stats, paths, period_secs=10, retain_days=7): |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
36 # graphite version of this in scales/graphite.py |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
37 base = ['stats', os.path.basename(sys.argv[0]).split('.py')[0]] |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
38 tags = {'host': socket.gethostname()} |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
39 def send(): |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
40 now = int(time.time()) |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
41 points = [] |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
42 def getVal(path): |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
43 x = stats |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
44 comps = path.split('.')[1:] |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
45 for comp in comps: |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
46 x2 = x |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
47 x = getattr(x, comp, None) |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
48 if x is None: |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
49 x = x2[comp] |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
50 if x is None: |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
51 print("no path %s" % path) |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
52 return |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
53 if math.isnan(x): |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
54 return |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
55 points.append({ |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
56 'measurement': '.'.join(base + comps[:-1]), |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
57 "tags": tags, |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
58 "fields": {comps[-1]: x}, |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
59 "time": now |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
60 }) |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
61 for path in paths: |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
62 getVal(path) |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
63 if points: |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
64 self.influx.write_points( |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
65 points, time_precision='s', |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
66 retention_policy=self.retentionPolicies.getCreatedPolicy(days=retain_days)) |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
67 if self.lastExport == 0: |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
68 log.info('writing stats to %r', points) |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
69 self.lastExport = now |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
70 #print('send %r' % points) |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
71 |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
72 task.LoopingCall(send).start(period_secs, now=False) |
b50a13ef20ba
exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents:
1153
diff
changeset
|
73 |
1142
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
74 def exportToInflux(self, currentStatements): |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
75 graph = self.graph |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
76 now = int(time.time()) |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
77 |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
78 points = [] |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
79 for stmt in currentStatements: |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
80 if (stmt[0], stmt[1]) in self.measurements: |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
81 meas = self.measurements[(stmt[0], stmt[1])] |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
82 measurementName = graph.value(meas, ROOM['measurement']) |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
83 tags = {} |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
84 for t in graph.objects(meas, ROOM['tag']): |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
85 k = graph.value(t, ROOM['key']).toPython() |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
86 tags[k] = graph.value(t, ROOM['value']).toPython() |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
87 |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
88 value = self.influxValue(stmt[2]) |
1153
e4f49cd9dda3
add :pointsAtLeastEvery control
drewp <drewp@bigasterisk.com>
parents:
1142
diff
changeset
|
89 pale = 3600 |
e4f49cd9dda3
add :pointsAtLeastEvery control
drewp <drewp@bigasterisk.com>
parents:
1142
diff
changeset
|
90 if graph.value(meas, ROOM['pointsAtLeastEvery'], default=None): |
e4f49cd9dda3
add :pointsAtLeastEvery control
drewp <drewp@bigasterisk.com>
parents:
1142
diff
changeset
|
91 pale = graph.value(meas, ROOM['pointsAtLeastEvery']).toPython() |
1142
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
92 |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
93 if not self.shouldSendNewPoint(now, stmt[0], measurementName, |
1153
e4f49cd9dda3
add :pointsAtLeastEvery control
drewp <drewp@bigasterisk.com>
parents:
1142
diff
changeset
|
94 tags, value, pointsAtLeastEvery=pale): |
1142
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
95 continue |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
96 |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
97 points.append({ |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
98 'measurement': measurementName, |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
99 "tags": tags, |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
100 "fields": {"value": value}, |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
101 "time": now |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
102 }) |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
103 log.debug('send to influx %r', points[-1]) |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
104 if points: |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
105 self.influx.write_points(points, time_precision='s') |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
106 |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
107 def influxValue(self, rdfValue): |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
108 if rdfValue in [ROOM['motion'], ROOM['pressed']]: |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
109 value = 1 |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
110 elif rdfValue in [ROOM['noMotion'], ROOM['notPressed']]: |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
111 value = 0 |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
112 else: |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
113 value = rdfValue.toPython() |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
114 if not isinstance(value, (int, float)): |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
115 raise NotImplementedError('value=%r' % value) |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
116 return value |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
117 |
1153
e4f49cd9dda3
add :pointsAtLeastEvery control
drewp <drewp@bigasterisk.com>
parents:
1142
diff
changeset
|
118 def shouldSendNewPoint(self, now, subj, measurementName, tags, value, pointsAtLeastEvery): |
1142
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
119 key = (subj, measurementName, tuple(sorted(tags.items()))) |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
120 if key in self.lastSent: |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
121 lastTime, lastValue = self.lastSent[key] |
1153
e4f49cd9dda3
add :pointsAtLeastEvery control
drewp <drewp@bigasterisk.com>
parents:
1142
diff
changeset
|
122 if lastValue == value and lastTime > now - pointsAtLeastEvery: |
1142
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
123 log.debug('skip influx point %r', key) |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
124 return False |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
125 |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
126 self.lastSent[key] = (now, value) |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
127 return True |