changeset 796:fc74ae6d5d68

rm influx (was broken); write some data to prometheus
author drewp@bigasterisk.com
date Tue, 29 Dec 2020 20:53:23 -0800
parents c8562ace4917
children a3e430b39177
files service/mqtt_to_rdf/mqtt_to_rdf.py service/mqtt_to_rdf/requirements.txt
diffstat 2 files changed, 33 insertions(+), 18 deletions(-) [+]
line wrap: on
line diff
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py	Sun Dec 27 03:29:18 2020 -0800
+++ b/service/mqtt_to_rdf/mqtt_to_rdf.py	Tue Dec 29 20:53:23 2020 -0800
@@ -7,9 +7,6 @@
 
 import cyclone.web
 from docopt import docopt
-from export_to_influxdb import InfluxExporter
-from greplin import scales
-from greplin.scales.cyclonehandler import StatsHandler
 from mqtt_client import MqttClient
 from patchablegraph import (
     CycloneGraphEventsHandler,
@@ -24,15 +21,19 @@
 import rx.operators
 import rx.scheduler.eventloop
 from standardservice.logsetup import log, verboseLogging
-from standardservice.scalessetup import gatherProcessStats
 from twisted.internet import reactor
+import prometheus_client
+from prometheus_client import Counter, Gauge, Histogram
+from prometheus_client import Summary
+from prometheus_client.exposition import generate_latest
+from prometheus_client.registry import REGISTRY
 
 from button_events import button_events
 
 ROOM = Namespace('http://projects.bigasterisk.com/room/')
 
-gatherProcessStats()
 
+collectors = {}
 
 def parseDurationLiteral(lit: Literal) -> float:
     if lit.endswith('s'):
@@ -42,20 +43,19 @@
 
 class MqttStatementSource:
 
-    def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, influx):
+    def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt):
         self.uri = uri
         self.config = config
         self.masterGraph = masterGraph
         self.mqtt = mqtt  # deprecated
         self.internalMqtt = internalMqtt
-        self.influx = influx
 
         self.mqttTopic = self.topicFromConfig(self.config)
         log.debug(f'new mqttTopic {self.mqttTopic}')
 
         statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|')
-        scales.init(self, statPath)
-        self._mqttStats = scales.collection(statPath + '/incoming', scales.IntStat('count'), scales.RecentFpsStat('fps'))
+        #scales.init(self, statPath)
+        #self._mqttStats = scales.collection(statPath + '/incoming', scales.IntStat('count'), scales.RecentFpsStat('fps'))
 
         rawBytes = self.subscribeMqtt(self.mqttTopic)
         rawBytes = self.addFilters(rawBytes)
@@ -93,8 +93,8 @@
         return mqtt.subscribe(topic)
 
     def countIncomingMessage(self, _):
-        self._mqttStats.fps.mark()
-        self._mqttStats.count += 1
+        pass  #self._mqttStats.fps.mark()
+        #self._mqttStats.count += 1
 
     def getParser(self):
         g = self.config
@@ -168,7 +168,18 @@
         g = graphFromQuads(newQuads)
         log.debug(f'{self.uri} update to {len(newQuads)} statements')
 
-        self.influx.exportToInflux(newQuads)
+        for quad in newQuads:
+            meas = quad[0].split('/')[-1]
+            if meas.startswith('airQuality'):
+                where_prefix, type_ = meas[len('airQuality'):].split('door')
+                where = where_prefix + 'door'
+                metric = 'air'
+                tags = {'loc': where.lower(), 'type': type_.lower()}
+                val = quad[2].toPython()
+                if metric not in collectors:
+                    collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys())
+
+                collectors[metric].labels(**tags).set(val)
 
         self.masterGraph.patchSubgraph(self.uri, g)
 
@@ -186,6 +197,13 @@
         raise KeyError(value)
 
 
+class Metrics(cyclone.web.RequestHandler):
+
+    def get(self):
+        self.add_header('content-type', 'text/plain')
+        self.write(generate_latest(REGISTRY))
+
+
 if __name__ == '__main__':
     arg = docopt("""
     Usage: mqtt_to_rdf.py [options]
@@ -209,11 +227,10 @@
     internalMqtt = MqttClient(clientId='mqtt_to_rdf',
                               brokerHost='mosquitto-frontdoor.default.svc.cluster.local',
                               brokerPort=10210)
-    influx = InfluxExporter(config, influxHost='influxdb.default.svc.cluster.local')
 
     srcs = []
     for src in config.subjects(RDF.type, ROOM['MqttStatementSource']):
-        srcs.append(MqttStatementSource(src, config, masterGraph, mqtt=mqtt, internalMqtt=internalMqtt, influx=influx))
+        srcs.append(MqttStatementSource(src, config, masterGraph, mqtt=mqtt, internalMqtt=internalMqtt))
     log.info(f'set up {len(srcs)} sources')
 
     port = 10018
@@ -226,15 +243,13 @@
                           (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, {
                               "path": "build"
                           }),
-                          (r'/stats/(.*)', StatsHandler, {
-                              'serverName': 'mqtt_to_rdf'
-                          }),
                           (r"/graph/mqtt", CycloneGraphHandler, {
                               'masterGraph': masterGraph
                           }),
                           (r"/graph/mqtt/events", CycloneGraphEventsHandler, {
                               'masterGraph': masterGraph
                           }),
+                          (r'/metrics', Metrics),
                       ],
                                               mqtt=mqtt,
                                               internalMqtt=internalMqtt,
--- a/service/mqtt_to_rdf/requirements.txt	Sun Dec 27 03:29:18 2020 -0800
+++ b/service/mqtt_to_rdf/requirements.txt	Tue Dec 29 20:53:23 2020 -0800
@@ -3,9 +3,9 @@
 rdflib==4.2.2
 service_identity==18.1.0
 twisted-mqtt==0.3.9
-git+http://github.com/drewp/scales.git@550bcba42c5e96152ff5c5bd753fbb33ffdfe460#egg=scales
 rx==3.1.1
 docopt
+prometheus_client==0.8.0
 
 cycloneerr
 export_to_influxdb==0.4.0