Mercurial > code > home > repos > homeauto
annotate lib/export_to_influxdb.py @ 1189:c1cf544711da
start tiny_screen
Ignore-this: 3fb7ee4bea3f9f660fd275de2964afc4
darcs-hash:af475a8625caaad03e64f15ccf9d13ccd76b1323
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Tue, 01 Jan 2019 10:15:25 -0800 |
parents | e4f49cd9dda3 |
children | b50a13ef20ba |
rev | line source |
---|---|
1142
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
1 import time, logging |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
2 from influxdb import InfluxDBClient |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
3 from rdflib import Namespace |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
4 |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
5 log = logging.getLogger() |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
6 ROOM = Namespace('http://projects.bigasterisk.com/room/') |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
7 |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
8 class InfluxExporter(object): |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
9 def __init__(self, configGraph, influxHost='bang6'): |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
10 self.graph = configGraph |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
11 self.influx = InfluxDBClient(influxHost, 9060, 'root', 'root', 'main') |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
12 self.lastExport = 0 |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
13 self.lastSent = {} # (subj, measurementName, tags) : (time, value) |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
14 |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
15 self.measurements = {} # (subj, predicate) : measurement |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
16 for s, m in self.graph.subject_objects(ROOM['influxMeasurement']): |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
17 self.measurements[(s, self.graph.value(m, ROOM['predicate']))] = m |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
18 |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
19 def exportToInflux(self, currentStatements): |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
20 graph = self.graph |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
21 now = int(time.time()) |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
22 |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
23 points = [] |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
24 for stmt in currentStatements: |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
25 if (stmt[0], stmt[1]) in self.measurements: |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
26 meas = self.measurements[(stmt[0], stmt[1])] |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
27 measurementName = graph.value(meas, ROOM['measurement']) |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
28 tags = {} |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
29 for t in graph.objects(meas, ROOM['tag']): |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
30 k = graph.value(t, ROOM['key']).toPython() |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
31 tags[k] = graph.value(t, ROOM['value']).toPython() |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
32 |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
33 value = self.influxValue(stmt[2]) |
1153
e4f49cd9dda3
add :pointsAtLeastEvery control
drewp <drewp@bigasterisk.com>
parents:
1142
diff
changeset
|
34 pale = 3600 |
e4f49cd9dda3
add :pointsAtLeastEvery control
drewp <drewp@bigasterisk.com>
parents:
1142
diff
changeset
|
35 if graph.value(meas, ROOM['pointsAtLeastEvery'], default=None): |
e4f49cd9dda3
add :pointsAtLeastEvery control
drewp <drewp@bigasterisk.com>
parents:
1142
diff
changeset
|
36 pale = graph.value(meas, ROOM['pointsAtLeastEvery']).toPython() |
1142
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
37 |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
38 if not self.shouldSendNewPoint(now, stmt[0], measurementName, |
1153
e4f49cd9dda3
add :pointsAtLeastEvery control
drewp <drewp@bigasterisk.com>
parents:
1142
diff
changeset
|
39 tags, value, pointsAtLeastEvery=pale): |
1142
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
40 continue |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
41 |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
42 points.append({ |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
43 'measurement': measurementName, |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
44 "tags": tags, |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
45 "fields": {"value": value}, |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
46 "time": now |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
47 }) |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
48 log.debug('send to influx %r', points[-1]) |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
49 if points: |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
50 self.influx.write_points(points, time_precision='s') |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
51 |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
52 def influxValue(self, rdfValue): |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
53 if rdfValue in [ROOM['motion'], ROOM['pressed']]: |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
54 value = 1 |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
55 elif rdfValue in [ROOM['noMotion'], ROOM['notPressed']]: |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
56 value = 0 |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
57 else: |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
58 value = rdfValue.toPython() |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
59 if not isinstance(value, (int, float)): |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
60 raise NotImplementedError('value=%r' % value) |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
61 return value |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
62 |
1153
e4f49cd9dda3
add :pointsAtLeastEvery control
drewp <drewp@bigasterisk.com>
parents:
1142
diff
changeset
|
63 def shouldSendNewPoint(self, now, subj, measurementName, tags, value, pointsAtLeastEvery): |
1142
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
64 key = (subj, measurementName, tuple(sorted(tags.items()))) |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
65 if key in self.lastSent: |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
66 lastTime, lastValue = self.lastSent[key] |
1153
e4f49cd9dda3
add :pointsAtLeastEvery control
drewp <drewp@bigasterisk.com>
parents:
1142
diff
changeset
|
67 if lastValue == value and lastTime > now - pointsAtLeastEvery: |
1142
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
68 log.debug('skip influx point %r', key) |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
69 return False |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
70 |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
71 self.lastSent[key] = (now, value) |
eb36b30f53b9
move export_to_influxdb up to lib
drewp <drewp@bigasterisk.com>
parents:
diff
changeset
|
72 return True |