Mercurial > code > home > repos > homeauto
annotate lib/export_to_influxdb.py @ 348:bf2174646809
add :pointsAtLeastEvery control
Ignore-this: 9d0236b56b2a7592211ca68b87b4a5d1
author | drewp@bigasterisk.com |
---|---|
date | Sun, 15 Apr 2018 04:41:00 -0700 |
parents | 1756a5519026 |
children | fcd2c026f51e |
rev | line source |
---|---|
293 | 1 import time, logging |
2 from influxdb import InfluxDBClient | |
3 from rdflib import Namespace | |
4 | |
5 log = logging.getLogger() | |
6 ROOM = Namespace('http://projects.bigasterisk.com/room/') | |
7 | |
8 class InfluxExporter(object): | |
9 def __init__(self, configGraph, influxHost='bang6'): | |
10 self.graph = configGraph | |
11 self.influx = InfluxDBClient(influxHost, 9060, 'root', 'root', 'main') | |
12 self.lastExport = 0 | |
13 self.lastSent = {} # (subj, measurementName, tags) : (time, value) | |
14 | |
15 self.measurements = {} # (subj, predicate) : measurement | |
16 for s, m in self.graph.subject_objects(ROOM['influxMeasurement']): | |
17 self.measurements[(s, self.graph.value(m, ROOM['predicate']))] = m | |
18 | |
19 def exportToInflux(self, currentStatements): | |
20 graph = self.graph | |
21 now = int(time.time()) | |
22 | |
23 points = [] | |
24 for stmt in currentStatements: | |
25 if (stmt[0], stmt[1]) in self.measurements: | |
26 meas = self.measurements[(stmt[0], stmt[1])] | |
27 measurementName = graph.value(meas, ROOM['measurement']) | |
28 tags = {} | |
29 for t in graph.objects(meas, ROOM['tag']): | |
30 k = graph.value(t, ROOM['key']).toPython() | |
31 tags[k] = graph.value(t, ROOM['value']).toPython() | |
32 | |
33 value = self.influxValue(stmt[2]) | |
348 | 34 pale = 3600 |
35 if graph.value(meas, ROOM['pointsAtLeastEvery'], default=None): | |
36 pale = graph.value(meas, ROOM['pointsAtLeastEvery']).toPython() | |
293 | 37 |
38 if not self.shouldSendNewPoint(now, stmt[0], measurementName, | |
348 | 39 tags, value, pointsAtLeastEvery=pale): |
293 | 40 continue |
41 | |
42 points.append({ | |
43 'measurement': measurementName, | |
44 "tags": tags, | |
45 "fields": {"value": value}, | |
46 "time": now | |
47 }) | |
48 log.debug('send to influx %r', points[-1]) | |
49 if points: | |
50 self.influx.write_points(points, time_precision='s') | |
51 | |
52 def influxValue(self, rdfValue): | |
304
e7cbf250188a
influx output, fade support, switch to Adafruit_DHT, start of Lcd8544
drewp@bigasterisk.com
parents:
293
diff
changeset
|
53 if rdfValue in [ROOM['motion'], ROOM['pressed']]: |
293 | 54 value = 1 |
304
e7cbf250188a
influx output, fade support, switch to Adafruit_DHT, start of Lcd8544
drewp@bigasterisk.com
parents:
293
diff
changeset
|
55 elif rdfValue in [ROOM['noMotion'], ROOM['notPressed']]: |
293 | 56 value = 0 |
57 else: | |
58 value = rdfValue.toPython() | |
59 if not isinstance(value, (int, float)): | |
60 raise NotImplementedError('value=%r' % value) | |
61 return value | |
62 | |
348 | 63 def shouldSendNewPoint(self, now, subj, measurementName, tags, value, pointsAtLeastEvery): |
293 | 64 key = (subj, measurementName, tuple(sorted(tags.items()))) |
65 if key in self.lastSent: | |
66 lastTime, lastValue = self.lastSent[key] | |
348 | 67 if lastValue == value and lastTime > now - pointsAtLeastEvery: |
293 | 68 log.debug('skip influx point %r', key) |
69 return False | |
70 | |
71 self.lastSent[key] = (now, value) | |
72 return True |