Mercurial > code > home > repos > homeauto
diff service/mqtt_to_rdf/mqtt_to_rdf.py @ 799:e0e623c01a69
ts build is part of docker now; new web debug console
author | drewp@bigasterisk.com |
---|---|
date | Fri, 01 Jan 2021 14:17:12 -0800 |
parents | cdc76c84e3e2 |
children | 6ddc5e037f15 |
line wrap: on
line diff
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py Tue Dec 29 21:05:32 2020 -0800 +++ b/service/mqtt_to_rdf/mqtt_to_rdf.py Fri Jan 01 14:17:12 2021 -0800 @@ -1,11 +1,14 @@ """ Subscribe to mqtt topics; generate RDF statements. """ +import time import json +from logging import debug from pathlib import Path from typing import Callable, cast import cyclone.web +import cyclone.sse import prometheus_client import rx import rx.operators @@ -21,7 +24,7 @@ from rdflib.term import Node from rx.core import Observable from standardservice.logsetup import log, verboseLogging -from twisted.internet import reactor +from twisted.internet import reactor, task from button_events import button_events @@ -30,6 +33,11 @@ collectors = {} +def appendLimit(lst, elem, n=10): + del lst[:len(lst) - n + 1] + lst.append(elem) + + def parseDurationLiteral(lit: Literal) -> float: if lit.endswith('s'): return float(lit.split('s')[0]) @@ -38,33 +46,43 @@ class MqttStatementSource: - def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt): + def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData): self.uri = uri self.config = config self.masterGraph = masterGraph + self.debugPageData = debugPageData self.mqtt = mqtt # deprecated self.internalMqtt = internalMqtt self.mqttTopic = self.topicFromConfig(self.config) log.debug(f'new mqttTopic {self.mqttTopic}') - statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|') - #scales.init(self, statPath) - #self._mqttStats = scales.collection(statPath + '/incoming', scales.IntStat('count'), scales.RecentFpsStat('fps')) - - rawBytes = self.subscribeMqtt(self.mqttTopic) - rawBytes = self.addFilters(rawBytes) - rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes) - parsed = self.getParser()(rawBytes) + self.debugSub = { + 'topic': self.mqttTopic.decode('ascii'), + 'recentMessages': [], + 'recentParsed': [], + 'recentConversions': [], + 'currentMetrics': [], + 'currentOutputGraph': { + 't': 1, + 'n3': "(n3)" + }, + } + self.debugPageData['subscribed'].append(self.debugSub) - g = self.config - for conv in g.items(g.value(self.uri, ROOM['conversions'])): - parsed = self.conversionStep(conv)(parsed) + rawBytes: Observable = self.subscribeMqtt(self.mqttTopic) + # rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes) + rawBytes.subscribe(on_next=self.countIncomingMessage) + # rawBytes = self.addFilters(rawBytes) + # parsed = self.getParser()(rawBytes) - outputQuadsSets = rx.combine_latest( - *[self.makeQuads(parsed, plan) for plan in g.objects(self.uri, ROOM['graphStatements'])]) + # g = self.config + # for conv in g.items(g.value(self.uri, ROOM['conversions'])): + # parsed = self.conversionStep(conv)(parsed) - outputQuadsSets.subscribe_(self.updateQuads) + # outputQuadsSets = rx.combine_latest( *[self.makeQuads(parsed, plan) for plan in g.objects(self.uri, ROOM['graphStatements'])]) + + # outputQuadsSets.subscribe_(self.updateQuads) def addFilters(self, rawBytes): jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals']) @@ -87,9 +105,13 @@ mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt return mqtt.subscribe(topic) - def countIncomingMessage(self, _): - pass #self._mqttStats.fps.mark() - #self._mqttStats.count += 1 + def countIncomingMessage(self, msg: bytes): + self.debugPageData['messagesSeen'] += 1 + + appendLimit(self.debugSub['recentMessages'], { + 't': round(time.time(), 3), + 'msg': msg.decode('ascii'), + }) def getParser(self): g = self.config @@ -199,6 +221,32 @@ self.write(generate_latest(REGISTRY)) +class DebugPageData(cyclone.sse.SSEHandler): + + def __init__(self, application, request): + cyclone.sse.SSEHandler.__init__(self, application, request) + self.lastSent = None + + def watch(self): + try: + dpd = self.settings.debugPageData + js = json.dumps(dpd, sort_keys=True) + if js != self.lastSent: + print('sending dpd update') + self.sendEvent(message=js) + self.lastSent = js + except Exception: + import traceback + traceback.print_exc() + + def bind(self): + self.loop = task.LoopingCall(self.watch) + self.loop.start(1, now=True) + + def unbind(self): + self.loop.stop() + + if __name__ == '__main__': arg = docopt(""" Usage: mqtt_to_rdf.py [options] @@ -218,14 +266,23 @@ masterGraph = PatchableGraph() + brokerHost = 'mosquitto-frontdoor.default.svc.cluster.local' + brokerPort = 10210 + + debugPageData = { + # schema in index.ts + 'server': f'{brokerHost}:{brokerPort}', + 'messagesSeen': 0, + 'subscribed': [], + } + mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883) # deprecated - internalMqtt = MqttClient(clientId='mqtt_to_rdf', - brokerHost='mosquitto-frontdoor.default.svc.cluster.local', - brokerPort=10210) + internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort) srcs = [] - for src in config.subjects(RDF.type, ROOM['MqttStatementSource']): - srcs.append(MqttStatementSource(src, config, masterGraph, mqtt=mqtt, internalMqtt=internalMqtt)) + for src in sorted(config.subjects(RDF.type, ROOM['MqttStatementSource'])): + srcs.append( + MqttStatementSource(src, config, masterGraph, mqtt=mqtt, internalMqtt=internalMqtt, debugPageData=debugPageData)) log.info(f'set up {len(srcs)} sources') port = 10018 @@ -244,11 +301,13 @@ (r"/graph/mqtt/events", CycloneGraphEventsHandler, { 'masterGraph': masterGraph }), + (r'/debugPageData', DebugPageData), (r'/metrics', Metrics), ], mqtt=mqtt, internalMqtt=internalMqtt, masterGraph=masterGraph, + debugPageData=debugPageData, debug=arg['-v']), interface='::') log.warn('serving on %s', port)