# HG changeset patch # User drewp@bigasterisk.com # Date 1554634577 25200 # Node ID fcd2c026f51e0a582138b24dfb890b8307ac7267 # Parent d6a6076fce6e63b99d7b34d78feb797cc79b95ae exportStats for sending scales data to influxdb Ignore-this: 715ff40fed97559efa37edf0efa58220 diff -r d6a6076fce6e -r fcd2c026f51e lib/export_to_influxdb.py --- a/lib/export_to_influxdb.py Thu Apr 04 02:22:04 2019 -0700 +++ b/lib/export_to_influxdb.py Sun Apr 07 03:56:17 2019 -0700 @@ -1,21 +1,76 @@ -import time, logging +import time, logging, math, os, sys, socket from influxdb import InfluxDBClient from rdflib import Namespace +from twisted.internet import task log = logging.getLogger() ROOM = Namespace('http://projects.bigasterisk.com/room/') +class RetentionPolicies(object): + def __init__(self, influx): + self.influx = influx + self.createdPolicies = set() # days + + def getCreatedPolicy(self, days): + name = 'ret_%d_day' % days + if days not in self.createdPolicies: + self.influx.create_retention_policy(name, + duration='%dd' % days, + replication='1') + self.createdPolicies.add(days) + return name + class InfluxExporter(object): def __init__(self, configGraph, influxHost='bang6'): self.graph = configGraph self.influx = InfluxDBClient(influxHost, 9060, 'root', 'root', 'main') + self.retentionPolicies = RetentionPolicies(self.influx) + self.lastSent = {} self.lastExport = 0 - self.lastSent = {} # (subj, measurementName, tags) : (time, value) self.measurements = {} # (subj, predicate) : measurement for s, m in self.graph.subject_objects(ROOM['influxMeasurement']): self.measurements[(s, self.graph.value(m, ROOM['predicate']))] = m - + + def exportStats(self, stats, paths, period_secs=10, retain_days=7): + # graphite version of this in scales/graphite.py + base = ['stats', os.path.basename(sys.argv[0]).split('.py')[0]] + tags = {'host': socket.gethostname()} + def send(): + now = int(time.time()) + points = [] + def getVal(path): + x = stats + comps = path.split('.')[1:] + for comp in comps: + x2 = x + x = getattr(x, comp, None) + if x is None: + x = x2[comp] + if x is None: + print("no path %s" % path) + return + if math.isnan(x): + return + points.append({ + 'measurement': '.'.join(base + comps[:-1]), + "tags": tags, + "fields": {comps[-1]: x}, + "time": now + }) + for path in paths: + getVal(path) + if points: + self.influx.write_points( + points, time_precision='s', + retention_policy=self.retentionPolicies.getCreatedPolicy(days=retain_days)) + if self.lastExport == 0: + log.info('writing stats to %r', points) + self.lastExport = now + #print('send %r' % points) + + task.LoopingCall(send).start(period_secs, now=False) + def exportToInflux(self, currentStatements): graph = self.graph now = int(time.time()) diff -r d6a6076fce6e -r fcd2c026f51e lib/patchablegraph.py --- a/lib/patchablegraph.py Thu Apr 04 02:22:04 2019 -0700 +++ b/lib/patchablegraph.py Sun Apr 07 03:56:17 2019 -0700 @@ -138,13 +138,13 @@ def bind(self): graphJson = self.masterGraph.asJsonLd() log.debug("send fullGraph event: %s", graphJson) - self.sendEvent(message=graphJson, event='fullGraph') + self.sendEvent(message=graphJson, event=b'fullGraph') self.masterGraph.addObserver(self.onPatch) def onPatch(self, patchJson): # throttle and combine patches here- ideally we could see how # long the latency to the client is to make a better rate choice - self.sendEvent(message=patchJson, event='patch') + self.sendEvent(message=patchJson, event=b'patch') def unbind(self): self.masterGraph.removeObserver(self.onPatch) diff -r d6a6076fce6e -r fcd2c026f51e lib/patchsource.py --- a/lib/patchsource.py Thu Apr 04 02:22:04 2019 -0700 +++ b/lib/patchsource.py Sun Apr 07 03:56:17 2019 -0700 @@ -109,10 +109,11 @@ todo: generate connection stmts in here """ - def __init__(self, url, listener): + def __init__(self, url, listener, reconnectSecs=60): self.url = url self._stopped = False self._listener = listener + self.reconnectSecs = reconnectSecs self._reconnect() def _reconnect(self): @@ -136,8 +137,8 @@ self._ps.stop() def _onConnectionFailed(self, arg): - reactor.callLater(60, self._reconnect) + reactor.callLater(self.reconnectSecs, self._reconnect) def _onConnectionLost(self, arg): - reactor.callLater(60, self._reconnect) + reactor.callLater(self.reconnectSecs, self._reconnect)