changeset 1232:b50a13ef20ba

exportStats for sending scales data to influxdb Ignore-this: 715ff40fed97559efa37edf0efa58220 darcs-hash:bcdd9713c4bf9c2ebe732ae1d30edc44f1be704f
author drewp <drewp@bigasterisk.com>
date Sun, 07 Apr 2019 03:56:17 -0700
parents 996a00d72979
children 5d97ed6118db
files lib/export_to_influxdb.py lib/patchablegraph.py lib/patchsource.py
diffstat 3 files changed, 64 insertions(+), 8 deletions(-) [+]
line wrap: on
line diff
--- a/lib/export_to_influxdb.py	Thu Apr 04 02:22:04 2019 -0700
+++ b/lib/export_to_influxdb.py	Sun Apr 07 03:56:17 2019 -0700
@@ -1,21 +1,76 @@
-import time, logging
+import time, logging, math, os, sys, socket
 from influxdb import InfluxDBClient
 from rdflib import Namespace
+from twisted.internet import task
 
 log = logging.getLogger()
 ROOM = Namespace('http://projects.bigasterisk.com/room/')
 
+class RetentionPolicies(object):
+    def __init__(self, influx):
+        self.influx = influx
+        self.createdPolicies = set() # days
+
+    def getCreatedPolicy(self, days):
+        name = 'ret_%d_day' % days
+        if days not in self.createdPolicies:
+            self.influx.create_retention_policy(name,
+                                                duration='%dd' % days,
+                                                replication='1')
+            self.createdPolicies.add(days)
+        return name
+        
 class InfluxExporter(object):
     def __init__(self, configGraph, influxHost='bang6'):
         self.graph = configGraph
         self.influx = InfluxDBClient(influxHost, 9060, 'root', 'root', 'main')
+        self.retentionPolicies = RetentionPolicies(self.influx)
+        self.lastSent = {}
         self.lastExport = 0
-        self.lastSent = {}  # (subj, measurementName, tags) : (time, value)
 
         self.measurements = {} # (subj, predicate) : measurement
         for s, m in self.graph.subject_objects(ROOM['influxMeasurement']):
             self.measurements[(s, self.graph.value(m, ROOM['predicate']))] = m
-        
+
+    def exportStats(self, stats, paths, period_secs=10, retain_days=7):
+        # graphite version of this in scales/graphite.py
+        base = ['stats', os.path.basename(sys.argv[0]).split('.py')[0]]
+        tags  = {'host': socket.gethostname()}
+        def send():
+            now = int(time.time())
+            points = []
+            def getVal(path):
+                x = stats
+                comps = path.split('.')[1:]
+                for comp in comps:
+                    x2 = x
+                    x = getattr(x, comp, None)
+                    if x is None:
+                        x = x2[comp]
+                        if x is None:
+                            print("no path %s" % path)
+                            return
+                if math.isnan(x):
+                    return
+                points.append({
+                    'measurement': '.'.join(base + comps[:-1]),
+                    "tags": tags,
+                    "fields": {comps[-1]: x},
+                    "time": now
+                })
+            for path in paths:
+                getVal(path)
+            if points:
+                self.influx.write_points(
+                    points, time_precision='s',
+                    retention_policy=self.retentionPolicies.getCreatedPolicy(days=retain_days))
+                if self.lastExport == 0:
+                    log.info('writing stats to %r', points)
+                self.lastExport = now
+                #print('send %r' % points)
+            
+        task.LoopingCall(send).start(period_secs, now=False)
+            
     def exportToInflux(self, currentStatements):
         graph = self.graph
         now = int(time.time())
--- a/lib/patchablegraph.py	Thu Apr 04 02:22:04 2019 -0700
+++ b/lib/patchablegraph.py	Sun Apr 07 03:56:17 2019 -0700
@@ -138,13 +138,13 @@
     def bind(self):
         graphJson = self.masterGraph.asJsonLd()
         log.debug("send fullGraph event: %s", graphJson)
-        self.sendEvent(message=graphJson, event='fullGraph')
+        self.sendEvent(message=graphJson, event=b'fullGraph')
         self.masterGraph.addObserver(self.onPatch)
 
     def onPatch(self, patchJson):
         # throttle and combine patches here- ideally we could see how
         # long the latency to the client is to make a better rate choice
-        self.sendEvent(message=patchJson, event='patch')
+        self.sendEvent(message=patchJson, event=b'patch')
                
     def unbind(self):
         self.masterGraph.removeObserver(self.onPatch)
--- a/lib/patchsource.py	Thu Apr 04 02:22:04 2019 -0700
+++ b/lib/patchsource.py	Sun Apr 07 03:56:17 2019 -0700
@@ -109,10 +109,11 @@
 
     todo: generate connection stmts in here
     """
-    def __init__(self, url, listener):
+    def __init__(self, url, listener, reconnectSecs=60):
         self.url = url
         self._stopped = False
         self._listener = listener
+        self.reconnectSecs = reconnectSecs
         self._reconnect()
 
     def _reconnect(self):
@@ -136,8 +137,8 @@
         self._ps.stop()
         
     def _onConnectionFailed(self, arg):
-        reactor.callLater(60, self._reconnect)
+        reactor.callLater(self.reconnectSecs, self._reconnect)
         
     def _onConnectionLost(self, arg):
-        reactor.callLater(60, self._reconnect)        
+        reactor.callLater(self.reconnectSecs, self._reconnect)