changeset 1098:b5906f6fce3f

save data to influxdb, not graphite Ignore-this: ebf07e54d1949bdf3c2e9a81c5fc7292 darcs-hash:d65ec17d9811cc51bbbbdaef0d1730767bf768df
author drewp <drewp@bigasterisk.com>
date Mon, 01 Aug 2016 02:26:38 -0700
parents 3aef251c7585
children cb94ea3495b2
files service/arduinoNode/arduinoNode.py service/piNode/export_to_influxdb.py service/piNode/piNode.py service/piNode/requirements.txt
diffstat 4 files changed, 88 insertions(+), 48 deletions(-) [+]
line wrap: on
line diff
--- a/service/arduinoNode/arduinoNode.py	Mon Aug 01 02:24:50 2016 -0700
+++ b/service/arduinoNode/arduinoNode.py	Mon Aug 01 02:26:38 2016 -0700
@@ -31,8 +31,8 @@
 from light9.rdfdb.patch import Patch
 from light9.rdfdb.rdflibpatch import inContext
 
-sys.path.append("/my/proj/room")
-from carbondata import CarbonClient
+sys.path.append("../piNode")
+from export_to_influxdb import InfluxExporter
 
 log = logging.getLogger()
 logging.getLogger('serial').setLevel(logging.WARN)
@@ -87,7 +87,7 @@
         
         self._statementsFromInputs = {} # input device uri: latest statements
         self._lastPollTime = {} # input device uri: time()
-        self._carbon = CarbonClient(serverHost='bang')
+        self._influx = InfluxExporter(self.configGraph)
         self.open()
         for d in self._devs:
             self.syncMasterGraphToHostStatements(d)
@@ -159,7 +159,9 @@
         elapsed = time.time() - t1
         if elapsed > 1.0:
             log.warn('poll took %.1f seconds' % elapsed)
-        self._exportToGraphite()
+
+        self._influx.exportToInflux(
+            set.union([set(v) for v in self._statementsFromInputs.values()]))
 
     def _sendOneshot(self, oneshot):
         body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3())
@@ -170,24 +172,6 @@
               headers={'Content-Type': ['text/n3']}, postdata=body,
               timeout=5)
 
-    def _exportToGraphite(self):
-        # note this is writing way too often- graphite is storing at a lower res
-        now = time.time()
-        # 20 sec is not precise; just trying to reduce wifi traffic
-        if getattr(self, 'lastGraphiteExport', 0) + 20 > now:
-            return
-        self.lastGraphiteExport = now
-        log.debug('graphite export:')
-        # objects of these statements are suitable as graphite values.
-        graphitePredicates = {ROOM['temperatureF']}
-        # bug: one sensor can have temp and humid- this will be ambiguous
-        for s, graphiteName in self.configGraph.subject_objects(ROOM['graphiteName']):
-            for group in self._statementsFromInputs.values():
-                for stmt in group:
-                    if stmt[0] == s and stmt[1] in graphitePredicates:
-                        log.debug('  sending %s -> %s', stmt[0], graphiteName)
-                        self._carbon.send(graphiteName, stmt[2].toPython(), now)
-
     def outputStatements(self, stmts):
         unused = set(stmts)
         for dev in self._devs:
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/piNode/export_to_influxdb.py	Mon Aug 01 02:26:38 2016 -0700
@@ -0,0 +1,71 @@
+import time, logging
+from influxdb import InfluxDBClient
+from rdflib import Namespace
+
+log = logging.getLogger()
+ROOM = Namespace('http://projects.bigasterisk.com/room/')
+
+class InfluxExporter(object):
+    def __init__(self, configGraph, influxHost='bang6'):
+        self.graph = configGraph
+        self.influx = InfluxDBClient(influxHost, 9060, 'root', 'root', 'main')
+        self.lastExport = 0
+        self.lastSent = {}  # (subj, measurementName, tags) : (time, value)
+
+        self.measurements = {} # (subj, predicate) : measurement
+        for s, m in self.graph.subject_objects(ROOM['influxMeasurement']):
+            self.measurements[(s, self.graph.value(m, ROOM['predicate']))] = m
+        
+    def exportToInflux(self, currentStatements):
+        graph = self.graph
+        now = int(time.time())
+      
+        log.debug('influxdb export:')
+
+        points = []
+        for stmt in currentStatements:
+            if (stmt[0], stmt[1]) in self.measurements:
+                meas = self.measurements[(stmt[0], stmt[1])]
+                measurementName = graph.value(meas, ROOM['measurement'])
+                tags = {}
+                for t in graph.objects(meas, ROOM['tag']):
+                    k = graph.value(t, ROOM['key']).toPython()
+                    tags[k] = graph.value(t, ROOM['value']).toPython()
+
+                value = self.influxValue(stmt[2])
+                    
+                if not self.shouldSendNewPoint(now, stmt[0], measurementName,
+                                               tags, value):
+                    continue
+                    
+                points.append({
+                    'measurement': measurementName,
+                    "tags": tags,
+                    "fields": {"value": value},
+                    "time": now
+                })
+                log.debug('send to influx %r', points[-1])
+        if points:
+            self.influx.write_points(points, time_precision='s')
+
+    def influxValue(self, rdfValue):
+        if rdfValue == ROOM['motion']:
+            value = 1
+        elif rdfValue == ROOM['noMotion']:
+            value = 0
+        else:
+            value = rdfValue.toPython()
+            if not isinstance(value, (int, float)):
+                raise NotImplementedError('value=%r' % value)
+        return value
+            
+    def shouldSendNewPoint(self, now, subj, measurementName, tags, value):
+        key = (subj, measurementName, tuple(sorted(tags.items())))
+        if key in self.lastSent:
+            lastTime, lastValue = self.lastSent[key]
+            if lastValue == value and lastTime > now - 3600:
+                log.debug('skip influx point %r', key)
+                return False
+
+        self.lastSent[key] = (now, value)
+        return True
--- a/service/piNode/piNode.py	Mon Aug 01 02:24:50 2016 -0700
+++ b/service/piNode/piNode.py	Mon Aug 01 02:26:38 2016 -0700
@@ -22,9 +22,7 @@
             return None
 
 import devices
-
-# from /my/proj/room
-from carbondata import CarbonClient
+from export_to_influxdb import InfluxExporter
 
 log = logging.getLogger()
 logging.getLogger('serial').setLevel(logging.WARN)
@@ -74,7 +72,7 @@
         log.debug('found %s devices', len(self._devs))
         self._statementsFromInputs = {} # input device uri: latest statements
         self._lastPollTime = {} # input device uri: time()
-        self._carbon = CarbonClient(serverHost='bang')
+        self._influx = InfluxExporter(self.graph)
         for d in self._devs:
             self.syncMasterGraphToHostStatements(d)
             
@@ -99,7 +97,7 @@
                 new = new['latest']
             else:
                 oneshot = None
-            prev = self._statementsFromInputs.get(i.uri, [])
+            prev = self._statementsFromInputs.get(i.uri, set())
 
             if new or prev:
                 self._statementsFromInputs[i.uri] = new
@@ -116,13 +114,13 @@
             if oneshot:
                 self._sendOneshot(oneshot)
             self._lastPollTime[i.uri] = now
-        self._exportToGraphite()
+        self._influx.exportToInflux(
+            set.union(*[set(v) for v in self._statementsFromInputs.values()]))
 
     def _sendOneshot(self, oneshot):
         body = (' '.join('%s %s %s .' % (s.n3(), p.n3(), o.n3())
                          for s,p,o in oneshot)).encode('utf8')
-        bang = '[fcb8:4119:fb46:96f8:8b07:1260:0f50:fcfa]'
-        url = 'http://%s:9071/oneShot' % bang
+        url = 'http://[%s]:9071/oneShot' % bang6
         d = fetch(method='POST',
                   url=url,
                   headers={'Content-Type': ['text/n3']},
@@ -133,24 +131,6 @@
                      url, e.getErrorMessage())
         d.addErrback(err)
 
-    def _exportToGraphite(self):
-        # note this is writing way too often- graphite is storing at a lower res
-        now = time.time()
-        # 20 sec is not precise; just trying to reduce wifi traffic
-        if getattr(self, 'lastGraphiteExport', 0) + 20 > now:
-            return
-        self.lastGraphiteExport = now
-        log.debug('graphite export:')
-        # objects of these statements are suitable as graphite values.
-        graphitePredicates = {ROOM['temperatureF']}
-        # bug: one sensor can have temp and humid- this will be ambiguous
-        for s, graphiteName in self.graph.subject_objects(ROOM['graphiteName']):
-            for group in self._statementsFromInputs.values():
-                for stmt in group:
-                    if stmt[0] == s and stmt[1] in graphitePredicates:
-                        log.debug('  sending %s -> %s', stmt[0], graphiteName)
-                        self._carbon.send(graphiteName, stmt[2].toPython(), now)
-
     def outputStatements(self, stmts):
         unused = set(stmts)
         for dev in self._devs:
--- a/service/piNode/requirements.txt	Mon Aug 01 02:24:50 2016 -0700
+++ b/service/piNode/requirements.txt	Mon Aug 01 02:26:38 2016 -0700
@@ -5,3 +5,8 @@
 python-dateutil
 w1thermsensor
 service_identity==16.0.0
+git+git://github.com/adafruit/Adafruit_Python_DHT#egg=DHT
+http://abyz.co.uk/rpi/pigpio/pigpio.zip
+git+git://github.com/adafruit/Adafruit_Nokia_LCD#egg=Nokia_LCD
+RPi.GPIO==0.6.2
+influxdb==3.0.0