annotate lib/export_to_influxdb.py @ 1308:68c04c74db71

rm old driver and web color picker for shiftbrites and some other 'sba' board Ignore-this: 8b7d7b6bd64ad0c635f304d7d94820f0 darcs-hash:c0e036714353e7d3255c42975d4be7739f6134f8
author drewp <drewp@bigasterisk.com>
date Sun, 21 Apr 2019 03:14:14 -0700
parents b50a13ef20ba
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
1232
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
1 import time, logging, math, os, sys, socket
1142
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
2 from influxdb import InfluxDBClient
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
3 from rdflib import Namespace
1232
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
4 from twisted.internet import task
1142
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
5
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
6 log = logging.getLogger()
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
7 ROOM = Namespace('http://projects.bigasterisk.com/room/')
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
8
1232
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
9 class RetentionPolicies(object):
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
10 def __init__(self, influx):
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
11 self.influx = influx
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
12 self.createdPolicies = set() # days
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
13
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
14 def getCreatedPolicy(self, days):
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
15 name = 'ret_%d_day' % days
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
16 if days not in self.createdPolicies:
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
17 self.influx.create_retention_policy(name,
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
18 duration='%dd' % days,
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
19 replication='1')
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
20 self.createdPolicies.add(days)
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
21 return name
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
22
1142
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
23 class InfluxExporter(object):
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
24 def __init__(self, configGraph, influxHost='bang6'):
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
25 self.graph = configGraph
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
26 self.influx = InfluxDBClient(influxHost, 9060, 'root', 'root', 'main')
1232
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
27 self.retentionPolicies = RetentionPolicies(self.influx)
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
28 self.lastSent = {}
1142
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
29 self.lastExport = 0
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
30
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
31 self.measurements = {} # (subj, predicate) : measurement
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
32 for s, m in self.graph.subject_objects(ROOM['influxMeasurement']):
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
33 self.measurements[(s, self.graph.value(m, ROOM['predicate']))] = m
1232
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
34
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
35 def exportStats(self, stats, paths, period_secs=10, retain_days=7):
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
36 # graphite version of this in scales/graphite.py
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
37 base = ['stats', os.path.basename(sys.argv[0]).split('.py')[0]]
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
38 tags = {'host': socket.gethostname()}
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
39 def send():
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
40 now = int(time.time())
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
41 points = []
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
42 def getVal(path):
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
43 x = stats
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
44 comps = path.split('.')[1:]
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
45 for comp in comps:
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
46 x2 = x
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
47 x = getattr(x, comp, None)
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
48 if x is None:
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
49 x = x2[comp]
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
50 if x is None:
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
51 print("no path %s" % path)
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
52 return
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
53 if math.isnan(x):
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
54 return
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
55 points.append({
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
56 'measurement': '.'.join(base + comps[:-1]),
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
57 "tags": tags,
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
58 "fields": {comps[-1]: x},
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
59 "time": now
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
60 })
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
61 for path in paths:
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
62 getVal(path)
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
63 if points:
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
64 self.influx.write_points(
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
65 points, time_precision='s',
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
66 retention_policy=self.retentionPolicies.getCreatedPolicy(days=retain_days))
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
67 if self.lastExport == 0:
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
68 log.info('writing stats to %r', points)
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
69 self.lastExport = now
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
70 #print('send %r' % points)
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
71
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
72 task.LoopingCall(send).start(period_secs, now=False)
b50a13ef20ba exportStats for sending scales data to influxdb
drewp <drewp@bigasterisk.com>
parents: 1153
diff changeset
73
1142
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
74 def exportToInflux(self, currentStatements):
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
75 graph = self.graph
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
76 now = int(time.time())
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
77
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
78 points = []
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
79 for stmt in currentStatements:
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
80 if (stmt[0], stmt[1]) in self.measurements:
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
81 meas = self.measurements[(stmt[0], stmt[1])]
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
82 measurementName = graph.value(meas, ROOM['measurement'])
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
83 tags = {}
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
84 for t in graph.objects(meas, ROOM['tag']):
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
85 k = graph.value(t, ROOM['key']).toPython()
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
86 tags[k] = graph.value(t, ROOM['value']).toPython()
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
87
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
88 value = self.influxValue(stmt[2])
1153
e4f49cd9dda3 add :pointsAtLeastEvery control
drewp <drewp@bigasterisk.com>
parents: 1142
diff changeset
89 pale = 3600
e4f49cd9dda3 add :pointsAtLeastEvery control
drewp <drewp@bigasterisk.com>
parents: 1142
diff changeset
90 if graph.value(meas, ROOM['pointsAtLeastEvery'], default=None):
e4f49cd9dda3 add :pointsAtLeastEvery control
drewp <drewp@bigasterisk.com>
parents: 1142
diff changeset
91 pale = graph.value(meas, ROOM['pointsAtLeastEvery']).toPython()
1142
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
92
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
93 if not self.shouldSendNewPoint(now, stmt[0], measurementName,
1153
e4f49cd9dda3 add :pointsAtLeastEvery control
drewp <drewp@bigasterisk.com>
parents: 1142
diff changeset
94 tags, value, pointsAtLeastEvery=pale):
1142
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
95 continue
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
96
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
97 points.append({
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
98 'measurement': measurementName,
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
99 "tags": tags,
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
100 "fields": {"value": value},
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
101 "time": now
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
102 })
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
103 log.debug('send to influx %r', points[-1])
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
104 if points:
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
105 self.influx.write_points(points, time_precision='s')
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
106
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
107 def influxValue(self, rdfValue):
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
108 if rdfValue in [ROOM['motion'], ROOM['pressed']]:
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
109 value = 1
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
110 elif rdfValue in [ROOM['noMotion'], ROOM['notPressed']]:
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
111 value = 0
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
112 else:
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
113 value = rdfValue.toPython()
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
114 if not isinstance(value, (int, float)):
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
115 raise NotImplementedError('value=%r' % value)
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
116 return value
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
117
1153
e4f49cd9dda3 add :pointsAtLeastEvery control
drewp <drewp@bigasterisk.com>
parents: 1142
diff changeset
118 def shouldSendNewPoint(self, now, subj, measurementName, tags, value, pointsAtLeastEvery):
1142
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
119 key = (subj, measurementName, tuple(sorted(tags.items())))
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
120 if key in self.lastSent:
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
121 lastTime, lastValue = self.lastSent[key]
1153
e4f49cd9dda3 add :pointsAtLeastEvery control
drewp <drewp@bigasterisk.com>
parents: 1142
diff changeset
122 if lastValue == value and lastTime > now - pointsAtLeastEvery:
1142
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
123 log.debug('skip influx point %r', key)
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
124 return False
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
125
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
126 self.lastSent[key] = (now, value)
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
127 return True