Mercurial > code > home > repos > homeauto
changeset 293:fc0e42933baa
save data to influxdb, not graphite
Ignore-this: ebf07e54d1949bdf3c2e9a81c5fc7292
author | drewp@bigasterisk.com |
---|---|
date | Mon, 01 Aug 2016 02:26:38 -0700 |
parents | 105969d248d6 |
children | 14ac4a210dbc |
files | service/arduinoNode/arduinoNode.py service/piNode/export_to_influxdb.py service/piNode/piNode.py service/piNode/requirements.txt |
diffstat | 4 files changed, 88 insertions(+), 48 deletions(-) [+] |
line wrap: on
line diff
--- a/service/arduinoNode/arduinoNode.py Mon Aug 01 02:24:50 2016 -0700 +++ b/service/arduinoNode/arduinoNode.py Mon Aug 01 02:26:38 2016 -0700 @@ -31,8 +31,8 @@ from light9.rdfdb.patch import Patch from light9.rdfdb.rdflibpatch import inContext -sys.path.append("/my/proj/room") -from carbondata import CarbonClient +sys.path.append("../piNode") +from export_to_influxdb import InfluxExporter log = logging.getLogger() logging.getLogger('serial').setLevel(logging.WARN) @@ -87,7 +87,7 @@ self._statementsFromInputs = {} # input device uri: latest statements self._lastPollTime = {} # input device uri: time() - self._carbon = CarbonClient(serverHost='bang') + self._influx = InfluxExporter(self.configGraph) self.open() for d in self._devs: self.syncMasterGraphToHostStatements(d) @@ -159,7 +159,9 @@ elapsed = time.time() - t1 if elapsed > 1.0: log.warn('poll took %.1f seconds' % elapsed) - self._exportToGraphite() + + self._influx.exportToInflux( + set.union([set(v) for v in self._statementsFromInputs.values()])) def _sendOneshot(self, oneshot): body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3()) @@ -170,24 +172,6 @@ headers={'Content-Type': ['text/n3']}, postdata=body, timeout=5) - def _exportToGraphite(self): - # note this is writing way too often- graphite is storing at a lower res - now = time.time() - # 20 sec is not precise; just trying to reduce wifi traffic - if getattr(self, 'lastGraphiteExport', 0) + 20 > now: - return - self.lastGraphiteExport = now - log.debug('graphite export:') - # objects of these statements are suitable as graphite values. - graphitePredicates = {ROOM['temperatureF']} - # bug: one sensor can have temp and humid- this will be ambiguous - for s, graphiteName in self.configGraph.subject_objects(ROOM['graphiteName']): - for group in self._statementsFromInputs.values(): - for stmt in group: - if stmt[0] == s and stmt[1] in graphitePredicates: - log.debug(' sending %s -> %s', stmt[0], graphiteName) - self._carbon.send(graphiteName, stmt[2].toPython(), now) - def outputStatements(self, stmts): unused = set(stmts) for dev in self._devs:
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/piNode/export_to_influxdb.py Mon Aug 01 02:26:38 2016 -0700 @@ -0,0 +1,71 @@ +import time, logging +from influxdb import InfluxDBClient +from rdflib import Namespace + +log = logging.getLogger() +ROOM = Namespace('http://projects.bigasterisk.com/room/') + +class InfluxExporter(object): + def __init__(self, configGraph, influxHost='bang6'): + self.graph = configGraph + self.influx = InfluxDBClient(influxHost, 9060, 'root', 'root', 'main') + self.lastExport = 0 + self.lastSent = {} # (subj, measurementName, tags) : (time, value) + + self.measurements = {} # (subj, predicate) : measurement + for s, m in self.graph.subject_objects(ROOM['influxMeasurement']): + self.measurements[(s, self.graph.value(m, ROOM['predicate']))] = m + + def exportToInflux(self, currentStatements): + graph = self.graph + now = int(time.time()) + + log.debug('influxdb export:') + + points = [] + for stmt in currentStatements: + if (stmt[0], stmt[1]) in self.measurements: + meas = self.measurements[(stmt[0], stmt[1])] + measurementName = graph.value(meas, ROOM['measurement']) + tags = {} + for t in graph.objects(meas, ROOM['tag']): + k = graph.value(t, ROOM['key']).toPython() + tags[k] = graph.value(t, ROOM['value']).toPython() + + value = self.influxValue(stmt[2]) + + if not self.shouldSendNewPoint(now, stmt[0], measurementName, + tags, value): + continue + + points.append({ + 'measurement': measurementName, + "tags": tags, + "fields": {"value": value}, + "time": now + }) + log.debug('send to influx %r', points[-1]) + if points: + self.influx.write_points(points, time_precision='s') + + def influxValue(self, rdfValue): + if rdfValue == ROOM['motion']: + value = 1 + elif rdfValue == ROOM['noMotion']: + value = 0 + else: + value = rdfValue.toPython() + if not isinstance(value, (int, float)): + raise NotImplementedError('value=%r' % value) + return value + + def shouldSendNewPoint(self, now, subj, measurementName, tags, value): + key = (subj, measurementName, tuple(sorted(tags.items()))) + if key in self.lastSent: + lastTime, lastValue = self.lastSent[key] + if lastValue == value and lastTime > now - 3600: + log.debug('skip influx point %r', key) + return False + + self.lastSent[key] = (now, value) + return True
--- a/service/piNode/piNode.py Mon Aug 01 02:24:50 2016 -0700 +++ b/service/piNode/piNode.py Mon Aug 01 02:26:38 2016 -0700 @@ -22,9 +22,7 @@ return None import devices - -# from /my/proj/room -from carbondata import CarbonClient +from export_to_influxdb import InfluxExporter log = logging.getLogger() logging.getLogger('serial').setLevel(logging.WARN) @@ -74,7 +72,7 @@ log.debug('found %s devices', len(self._devs)) self._statementsFromInputs = {} # input device uri: latest statements self._lastPollTime = {} # input device uri: time() - self._carbon = CarbonClient(serverHost='bang') + self._influx = InfluxExporter(self.graph) for d in self._devs: self.syncMasterGraphToHostStatements(d) @@ -99,7 +97,7 @@ new = new['latest'] else: oneshot = None - prev = self._statementsFromInputs.get(i.uri, []) + prev = self._statementsFromInputs.get(i.uri, set()) if new or prev: self._statementsFromInputs[i.uri] = new @@ -116,13 +114,13 @@ if oneshot: self._sendOneshot(oneshot) self._lastPollTime[i.uri] = now - self._exportToGraphite() + self._influx.exportToInflux( + set.union(*[set(v) for v in self._statementsFromInputs.values()])) def _sendOneshot(self, oneshot): body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3()) for s,p,o in oneshot)).encode('utf8') - bang = '[fcb8:4119:fb46:96f8:8b07:1260:0f50:fcfa]' - url = 'http://%s:9071/oneShot' % bang + url = 'http://[%s]:9071/oneShot' % bang6 d = fetch(method='POST', url=url, headers={'Content-Type': ['text/n3']}, @@ -133,24 +131,6 @@ url, e.getErrorMessage()) d.addErrback(err) - def _exportToGraphite(self): - # note this is writing way too often- graphite is storing at a lower res - now = time.time() - # 20 sec is not precise; just trying to reduce wifi traffic - if getattr(self, 'lastGraphiteExport', 0) + 20 > now: - return - self.lastGraphiteExport = now - log.debug('graphite export:') - # objects of these statements are suitable as graphite values. - graphitePredicates = {ROOM['temperatureF']} - # bug: one sensor can have temp and humid- this will be ambiguous - for s, graphiteName in self.graph.subject_objects(ROOM['graphiteName']): - for group in self._statementsFromInputs.values(): - for stmt in group: - if stmt[0] == s and stmt[1] in graphitePredicates: - log.debug(' sending %s -> %s', stmt[0], graphiteName) - self._carbon.send(graphiteName, stmt[2].toPython(), now) - def outputStatements(self, stmts): unused = set(stmts) for dev in self._devs:
--- a/service/piNode/requirements.txt Mon Aug 01 02:24:50 2016 -0700 +++ b/service/piNode/requirements.txt Mon Aug 01 02:26:38 2016 -0700 @@ -5,3 +5,8 @@ python-dateutil w1thermsensor service_identity==16.0.0 +git+git://github.com/adafruit/Adafruit_Python_DHT#egg=DHT +http://abyz.co.uk/rpi/pigpio/pigpio.zip +git+git://github.com/adafruit/Adafruit_Nokia_LCD#egg=Nokia_LCD +RPi.GPIO==0.6.2 +influxdb==3.0.0