annotate lib/export_to_influxdb.py @ 1209:9f6e6692f57b

workshop pi device config Ignore-this: c99682d90fa3ffee4373303d74af241b darcs-hash:f697bbd44106cf60c1754633e9395e3a12f512a8
author drewp <drewp@bigasterisk.com>
date Tue, 12 Mar 2019 00:11:07 -0700
parents e4f49cd9dda3
children b50a13ef20ba
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
1142
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
1 import time, logging
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
2 from influxdb import InfluxDBClient
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
3 from rdflib import Namespace
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
4
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
5 log = logging.getLogger()
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
6 ROOM = Namespace('http://projects.bigasterisk.com/room/')
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
7
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
8 class InfluxExporter(object):
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
9 def __init__(self, configGraph, influxHost='bang6'):
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
10 self.graph = configGraph
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
11 self.influx = InfluxDBClient(influxHost, 9060, 'root', 'root', 'main')
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
12 self.lastExport = 0
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
13 self.lastSent = {} # (subj, measurementName, tags) : (time, value)
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
14
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
15 self.measurements = {} # (subj, predicate) : measurement
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
16 for s, m in self.graph.subject_objects(ROOM['influxMeasurement']):
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
17 self.measurements[(s, self.graph.value(m, ROOM['predicate']))] = m
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
18
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
19 def exportToInflux(self, currentStatements):
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
20 graph = self.graph
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
21 now = int(time.time())
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
22
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
23 points = []
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
24 for stmt in currentStatements:
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
25 if (stmt[0], stmt[1]) in self.measurements:
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
26 meas = self.measurements[(stmt[0], stmt[1])]
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
27 measurementName = graph.value(meas, ROOM['measurement'])
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
28 tags = {}
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
29 for t in graph.objects(meas, ROOM['tag']):
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
30 k = graph.value(t, ROOM['key']).toPython()
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
31 tags[k] = graph.value(t, ROOM['value']).toPython()
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
32
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
33 value = self.influxValue(stmt[2])
1153
e4f49cd9dda3 add :pointsAtLeastEvery control
drewp <drewp@bigasterisk.com>
parents: 1142
diff changeset
34 pale = 3600
e4f49cd9dda3 add :pointsAtLeastEvery control
drewp <drewp@bigasterisk.com>
parents: 1142
diff changeset
35 if graph.value(meas, ROOM['pointsAtLeastEvery'], default=None):
e4f49cd9dda3 add :pointsAtLeastEvery control
drewp <drewp@bigasterisk.com>
parents: 1142
diff changeset
36 pale = graph.value(meas, ROOM['pointsAtLeastEvery']).toPython()
1142
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
37
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
38 if not self.shouldSendNewPoint(now, stmt[0], measurementName,
1153
e4f49cd9dda3 add :pointsAtLeastEvery control
drewp <drewp@bigasterisk.com>
parents: 1142
diff changeset
39 tags, value, pointsAtLeastEvery=pale):
1142
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
40 continue
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
41
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
42 points.append({
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
43 'measurement': measurementName,
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
44 "tags": tags,
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
45 "fields": {"value": value},
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
46 "time": now
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
47 })
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
48 log.debug('send to influx %r', points[-1])
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
49 if points:
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
50 self.influx.write_points(points, time_precision='s')
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
51
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
52 def influxValue(self, rdfValue):
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
53 if rdfValue in [ROOM['motion'], ROOM['pressed']]:
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
54 value = 1
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
55 elif rdfValue in [ROOM['noMotion'], ROOM['notPressed']]:
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
56 value = 0
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
57 else:
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
58 value = rdfValue.toPython()
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
59 if not isinstance(value, (int, float)):
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
60 raise NotImplementedError('value=%r' % value)
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
61 return value
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
62
1153
e4f49cd9dda3 add :pointsAtLeastEvery control
drewp <drewp@bigasterisk.com>
parents: 1142
diff changeset
63 def shouldSendNewPoint(self, now, subj, measurementName, tags, value, pointsAtLeastEvery):
1142
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
64 key = (subj, measurementName, tuple(sorted(tags.items())))
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
65 if key in self.lastSent:
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
66 lastTime, lastValue = self.lastSent[key]
1153
e4f49cd9dda3 add :pointsAtLeastEvery control
drewp <drewp@bigasterisk.com>
parents: 1142
diff changeset
67 if lastValue == value and lastTime > now - pointsAtLeastEvery:
1142
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
68 log.debug('skip influx point %r', key)
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
69 return False
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
70
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
71 self.lastSent[key] = (now, value)
eb36b30f53b9 move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff changeset
72 return True