Mercurial > code > home > repos > homeauto
changeset 796:fc74ae6d5d68
rm influx (was broken); write some data to prometheus
author | drewp@bigasterisk.com |
---|---|
date | Tue, 29 Dec 2020 20:53:23 -0800 |
parents | c8562ace4917 |
children | a3e430b39177 |
files | service/mqtt_to_rdf/mqtt_to_rdf.py service/mqtt_to_rdf/requirements.txt |
diffstat | 2 files changed, 33 insertions(+), 18 deletions(-) [+] |
line wrap: on
line diff
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py Sun Dec 27 03:29:18 2020 -0800 +++ b/service/mqtt_to_rdf/mqtt_to_rdf.py Tue Dec 29 20:53:23 2020 -0800 @@ -7,9 +7,6 @@ import cyclone.web from docopt import docopt -from export_to_influxdb import InfluxExporter -from greplin import scales -from greplin.scales.cyclonehandler import StatsHandler from mqtt_client import MqttClient from patchablegraph import ( CycloneGraphEventsHandler, @@ -24,15 +21,19 @@ import rx.operators import rx.scheduler.eventloop from standardservice.logsetup import log, verboseLogging -from standardservice.scalessetup import gatherProcessStats from twisted.internet import reactor +import prometheus_client +from prometheus_client import Counter, Gauge, Histogram +from prometheus_client import Summary +from prometheus_client.exposition import generate_latest +from prometheus_client.registry import REGISTRY from button_events import button_events ROOM = Namespace('http://projects.bigasterisk.com/room/') -gatherProcessStats() +collectors = {} def parseDurationLiteral(lit: Literal) -> float: if lit.endswith('s'): @@ -42,20 +43,19 @@ class MqttStatementSource: - def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, influx): + def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt): self.uri = uri self.config = config self.masterGraph = masterGraph self.mqtt = mqtt # deprecated self.internalMqtt = internalMqtt - self.influx = influx self.mqttTopic = self.topicFromConfig(self.config) log.debug(f'new mqttTopic {self.mqttTopic}') statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|') - scales.init(self, statPath) - self._mqttStats = scales.collection(statPath + '/incoming', scales.IntStat('count'), scales.RecentFpsStat('fps')) + #scales.init(self, statPath) + #self._mqttStats = scales.collection(statPath + '/incoming', scales.IntStat('count'), scales.RecentFpsStat('fps')) rawBytes = self.subscribeMqtt(self.mqttTopic) rawBytes = self.addFilters(rawBytes) @@ -93,8 +93,8 @@ return mqtt.subscribe(topic) def countIncomingMessage(self, _): - self._mqttStats.fps.mark() - self._mqttStats.count += 1 + pass #self._mqttStats.fps.mark() + #self._mqttStats.count += 1 def getParser(self): g = self.config @@ -168,7 +168,18 @@ g = graphFromQuads(newQuads) log.debug(f'{self.uri} update to {len(newQuads)} statements') - self.influx.exportToInflux(newQuads) + for quad in newQuads: + meas = quad[0].split('/')[-1] + if meas.startswith('airQuality'): + where_prefix, type_ = meas[len('airQuality'):].split('door') + where = where_prefix + 'door' + metric = 'air' + tags = {'loc': where.lower(), 'type': type_.lower()} + val = quad[2].toPython() + if metric not in collectors: + collectors[metric] = Gauge(metric, 'measurement', labelnames=tags.keys()) + + collectors[metric].labels(**tags).set(val) self.masterGraph.patchSubgraph(self.uri, g) @@ -186,6 +197,13 @@ raise KeyError(value) +class Metrics(cyclone.web.RequestHandler): + + def get(self): + self.add_header('content-type', 'text/plain') + self.write(generate_latest(REGISTRY)) + + if __name__ == '__main__': arg = docopt(""" Usage: mqtt_to_rdf.py [options] @@ -209,11 +227,10 @@ internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-frontdoor.default.svc.cluster.local', brokerPort=10210) - influx = InfluxExporter(config, influxHost='influxdb.default.svc.cluster.local') srcs = [] for src in config.subjects(RDF.type, ROOM['MqttStatementSource']): - srcs.append(MqttStatementSource(src, config, masterGraph, mqtt=mqtt, internalMqtt=internalMqtt, influx=influx)) + srcs.append(MqttStatementSource(src, config, masterGraph, mqtt=mqtt, internalMqtt=internalMqtt)) log.info(f'set up {len(srcs)} sources') port = 10018 @@ -226,15 +243,13 @@ (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, { "path": "build" }), - (r'/stats/(.*)', StatsHandler, { - 'serverName': 'mqtt_to_rdf' - }), (r"/graph/mqtt", CycloneGraphHandler, { 'masterGraph': masterGraph }), (r"/graph/mqtt/events", CycloneGraphEventsHandler, { 'masterGraph': masterGraph }), + (r'/metrics', Metrics), ], mqtt=mqtt, internalMqtt=internalMqtt,
--- a/service/mqtt_to_rdf/requirements.txt Sun Dec 27 03:29:18 2020 -0800 +++ b/service/mqtt_to_rdf/requirements.txt Tue Dec 29 20:53:23 2020 -0800 @@ -3,9 +3,9 @@ rdflib==4.2.2 service_identity==18.1.0 twisted-mqtt==0.3.9 -git+http://github.com/drewp/scales.git@550bcba42c5e96152ff5c5bd753fbb33ffdfe460#egg=scales rx==3.1.1 docopt +prometheus_client==0.8.0 cycloneerr export_to_influxdb==0.4.0