293
|
1 import time, logging
|
|
2 from influxdb import InfluxDBClient
|
|
3 from rdflib import Namespace
|
|
4
|
|
5 log = logging.getLogger()
|
|
6 ROOM = Namespace('http://projects.bigasterisk.com/room/')
|
|
7
|
|
8 class InfluxExporter(object):
|
|
9 def __init__(self, configGraph, influxHost='bang6'):
|
|
10 self.graph = configGraph
|
|
11 self.influx = InfluxDBClient(influxHost, 9060, 'root', 'root', 'main')
|
|
12 self.lastExport = 0
|
|
13 self.lastSent = {} # (subj, measurementName, tags) : (time, value)
|
|
14
|
|
15 self.measurements = {} # (subj, predicate) : measurement
|
|
16 for s, m in self.graph.subject_objects(ROOM['influxMeasurement']):
|
|
17 self.measurements[(s, self.graph.value(m, ROOM['predicate']))] = m
|
|
18
|
|
19 def exportToInflux(self, currentStatements):
|
|
20 graph = self.graph
|
|
21 now = int(time.time())
|
|
22
|
|
23 log.debug('influxdb export:')
|
|
24
|
|
25 points = []
|
|
26 for stmt in currentStatements:
|
|
27 if (stmt[0], stmt[1]) in self.measurements:
|
|
28 meas = self.measurements[(stmt[0], stmt[1])]
|
|
29 measurementName = graph.value(meas, ROOM['measurement'])
|
|
30 tags = {}
|
|
31 for t in graph.objects(meas, ROOM['tag']):
|
|
32 k = graph.value(t, ROOM['key']).toPython()
|
|
33 tags[k] = graph.value(t, ROOM['value']).toPython()
|
|
34
|
|
35 value = self.influxValue(stmt[2])
|
|
36
|
|
37 if not self.shouldSendNewPoint(now, stmt[0], measurementName,
|
|
38 tags, value):
|
|
39 continue
|
|
40
|
|
41 points.append({
|
|
42 'measurement': measurementName,
|
|
43 "tags": tags,
|
|
44 "fields": {"value": value},
|
|
45 "time": now
|
|
46 })
|
|
47 log.debug('send to influx %r', points[-1])
|
|
48 if points:
|
|
49 self.influx.write_points(points, time_precision='s')
|
|
50
|
|
51 def influxValue(self, rdfValue):
|
|
52 if rdfValue == ROOM['motion']:
|
|
53 value = 1
|
|
54 elif rdfValue == ROOM['noMotion']:
|
|
55 value = 0
|
|
56 else:
|
|
57 value = rdfValue.toPython()
|
|
58 if not isinstance(value, (int, float)):
|
|
59 raise NotImplementedError('value=%r' % value)
|
|
60 return value
|
|
61
|
|
62 def shouldSendNewPoint(self, now, subj, measurementName, tags, value):
|
|
63 key = (subj, measurementName, tuple(sorted(tags.items())))
|
|
64 if key in self.lastSent:
|
|
65 lastTime, lastValue = self.lastSent[key]
|
|
66 if lastValue == value and lastTime > now - 3600:
|
|
67 log.debug('skip influx point %r', key)
|
|
68 return False
|
|
69
|
|
70 self.lastSent[key] = (now, value)
|
|
71 return True
|