comparison lib/export_to_influxdb.py @ 1142:eb36b30f53b9

move export_to_influxdb up to lib Ignore-this: d43cc183b51bec93b28e6a7ba7bf1d6e darcs-hash:13dd7219f8eb69b32ce9ac7b93d0ff20fcf93e22
author drewp <drewp@bigasterisk.com>
date Sat, 03 Mar 2018 18:08:03 -0800
parents
children e4f49cd9dda3
comparison
equal deleted inserted replaced
1141:34fa399dc5c9 1142:eb36b30f53b9
1 import time, logging
2 from influxdb import InfluxDBClient
3 from rdflib import Namespace
4
5 log = logging.getLogger()
6 ROOM = Namespace('http://projects.bigasterisk.com/room/')
7
8 class InfluxExporter(object):
9 def __init__(self, configGraph, influxHost='bang6'):
10 self.graph = configGraph
11 self.influx = InfluxDBClient(influxHost, 9060, 'root', 'root', 'main')
12 self.lastExport = 0
13 self.lastSent = {} # (subj, measurementName, tags) : (time, value)
14
15 self.measurements = {} # (subj, predicate) : measurement
16 for s, m in self.graph.subject_objects(ROOM['influxMeasurement']):
17 self.measurements[(s, self.graph.value(m, ROOM['predicate']))] = m
18
19 def exportToInflux(self, currentStatements):
20 graph = self.graph
21 now = int(time.time())
22
23 points = []
24 for stmt in currentStatements:
25 if (stmt[0], stmt[1]) in self.measurements:
26 meas = self.measurements[(stmt[0], stmt[1])]
27 measurementName = graph.value(meas, ROOM['measurement'])
28 tags = {}
29 for t in graph.objects(meas, ROOM['tag']):
30 k = graph.value(t, ROOM['key']).toPython()
31 tags[k] = graph.value(t, ROOM['value']).toPython()
32
33 value = self.influxValue(stmt[2])
34
35 if not self.shouldSendNewPoint(now, stmt[0], measurementName,
36 tags, value):
37 continue
38
39 points.append({
40 'measurement': measurementName,
41 "tags": tags,
42 "fields": {"value": value},
43 "time": now
44 })
45 log.debug('send to influx %r', points[-1])
46 if points:
47 self.influx.write_points(points, time_precision='s')
48
49 def influxValue(self, rdfValue):
50 if rdfValue in [ROOM['motion'], ROOM['pressed']]:
51 value = 1
52 elif rdfValue in [ROOM['noMotion'], ROOM['notPressed']]:
53 value = 0
54 else:
55 value = rdfValue.toPython()
56 if not isinstance(value, (int, float)):
57 raise NotImplementedError('value=%r' % value)
58 return value
59
60 def shouldSendNewPoint(self, now, subj, measurementName, tags, value):
61 key = (subj, measurementName, tuple(sorted(tags.items())))
62 if key in self.lastSent:
63 lastTime, lastValue = self.lastSent[key]
64 if lastValue == value and lastTime > now - 3600:
65 log.debug('skip influx point %r', key)
66 return False
67
68 self.lastSent[key] = (now, value)
69 return True