diff service/mqtt_to_rdf/mqtt_to_rdf.py @ 799:e0e623c01a69

ts build is part of docker now; new web debug console
author drewp@bigasterisk.com
date Fri, 01 Jan 2021 14:17:12 -0800
parents cdc76c84e3e2
children 6ddc5e037f15
line wrap: on
line diff
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py	Tue Dec 29 21:05:32 2020 -0800
+++ b/service/mqtt_to_rdf/mqtt_to_rdf.py	Fri Jan 01 14:17:12 2021 -0800
@@ -1,11 +1,14 @@
 """
 Subscribe to mqtt topics; generate RDF statements.
 """
+import time
 import json
+from logging import debug
 from pathlib import Path
 from typing import Callable, cast
 
 import cyclone.web
+import cyclone.sse
 import prometheus_client
 import rx
 import rx.operators
@@ -21,7 +24,7 @@
 from rdflib.term import Node
 from rx.core import Observable
 from standardservice.logsetup import log, verboseLogging
-from twisted.internet import reactor
+from twisted.internet import reactor, task
 
 from button_events import button_events
 
@@ -30,6 +33,11 @@
 collectors = {}
 
 
+def appendLimit(lst, elem, n=10):
+    del lst[:len(lst) - n + 1]
+    lst.append(elem)
+
+
 def parseDurationLiteral(lit: Literal) -> float:
     if lit.endswith('s'):
         return float(lit.split('s')[0])
@@ -38,33 +46,43 @@
 
 class MqttStatementSource:
 
-    def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt):
+    def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, debugPageData):
         self.uri = uri
         self.config = config
         self.masterGraph = masterGraph
+        self.debugPageData = debugPageData
         self.mqtt = mqtt  # deprecated
         self.internalMqtt = internalMqtt
 
         self.mqttTopic = self.topicFromConfig(self.config)
         log.debug(f'new mqttTopic {self.mqttTopic}')
 
-        statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|')
-        #scales.init(self, statPath)
-        #self._mqttStats = scales.collection(statPath + '/incoming', scales.IntStat('count'), scales.RecentFpsStat('fps'))
-
-        rawBytes = self.subscribeMqtt(self.mqttTopic)
-        rawBytes = self.addFilters(rawBytes)
-        rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes)
-        parsed = self.getParser()(rawBytes)
+        self.debugSub = {
+            'topic': self.mqttTopic.decode('ascii'),
+            'recentMessages': [],
+            'recentParsed': [],
+            'recentConversions': [],
+            'currentMetrics': [],
+            'currentOutputGraph': {
+                't': 1,
+                'n3': "(n3)"
+            },
+        }
+        self.debugPageData['subscribed'].append(self.debugSub)
 
-        g = self.config
-        for conv in g.items(g.value(self.uri, ROOM['conversions'])):
-            parsed = self.conversionStep(conv)(parsed)
+        rawBytes: Observable = self.subscribeMqtt(self.mqttTopic)
+        # rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes)
+        rawBytes.subscribe(on_next=self.countIncomingMessage)
+        # rawBytes = self.addFilters(rawBytes)
+        # parsed = self.getParser()(rawBytes)
 
-        outputQuadsSets = rx.combine_latest(
-            *[self.makeQuads(parsed, plan) for plan in g.objects(self.uri, ROOM['graphStatements'])])
+        # g = self.config
+        # for conv in g.items(g.value(self.uri, ROOM['conversions'])):
+        #     parsed = self.conversionStep(conv)(parsed)
 
-        outputQuadsSets.subscribe_(self.updateQuads)
+        # outputQuadsSets = rx.combine_latest(            *[self.makeQuads(parsed, plan) for plan in g.objects(self.uri, ROOM['graphStatements'])])
+
+        # outputQuadsSets.subscribe_(self.updateQuads)
 
     def addFilters(self, rawBytes):
         jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals'])
@@ -87,9 +105,13 @@
         mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt
         return mqtt.subscribe(topic)
 
-    def countIncomingMessage(self, _):
-        pass  #self._mqttStats.fps.mark()
-        #self._mqttStats.count += 1
+    def countIncomingMessage(self, msg: bytes):
+        self.debugPageData['messagesSeen'] += 1
+
+        appendLimit(self.debugSub['recentMessages'], {
+            't': round(time.time(), 3),
+            'msg': msg.decode('ascii'),
+        })
 
     def getParser(self):
         g = self.config
@@ -199,6 +221,32 @@
         self.write(generate_latest(REGISTRY))
 
 
+class DebugPageData(cyclone.sse.SSEHandler):
+
+    def __init__(self, application, request):
+        cyclone.sse.SSEHandler.__init__(self, application, request)
+        self.lastSent = None
+
+    def watch(self):
+        try:
+            dpd = self.settings.debugPageData
+            js = json.dumps(dpd, sort_keys=True)
+            if js != self.lastSent:
+                print('sending dpd update')
+                self.sendEvent(message=js)
+                self.lastSent = js
+        except Exception:
+            import traceback
+            traceback.print_exc()
+
+    def bind(self):
+        self.loop = task.LoopingCall(self.watch)
+        self.loop.start(1, now=True)
+
+    def unbind(self):
+        self.loop.stop()
+
+
 if __name__ == '__main__':
     arg = docopt("""
     Usage: mqtt_to_rdf.py [options]
@@ -218,14 +266,23 @@
 
     masterGraph = PatchableGraph()
 
+    brokerHost = 'mosquitto-frontdoor.default.svc.cluster.local'
+    brokerPort = 10210
+
+    debugPageData = {
+        # schema in index.ts
+        'server': f'{brokerHost}:{brokerPort}',
+        'messagesSeen': 0,
+        'subscribed': [],
+    }
+
     mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883)  # deprecated
-    internalMqtt = MqttClient(clientId='mqtt_to_rdf',
-                              brokerHost='mosquitto-frontdoor.default.svc.cluster.local',
-                              brokerPort=10210)
+    internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost=brokerHost, brokerPort=brokerPort)
 
     srcs = []
-    for src in config.subjects(RDF.type, ROOM['MqttStatementSource']):
-        srcs.append(MqttStatementSource(src, config, masterGraph, mqtt=mqtt, internalMqtt=internalMqtt))
+    for src in sorted(config.subjects(RDF.type, ROOM['MqttStatementSource'])):
+        srcs.append(
+            MqttStatementSource(src, config, masterGraph, mqtt=mqtt, internalMqtt=internalMqtt, debugPageData=debugPageData))
     log.info(f'set up {len(srcs)} sources')
 
     port = 10018
@@ -244,11 +301,13 @@
                           (r"/graph/mqtt/events", CycloneGraphEventsHandler, {
                               'masterGraph': masterGraph
                           }),
+                          (r'/debugPageData', DebugPageData),
                           (r'/metrics', Metrics),
                       ],
                                               mqtt=mqtt,
                                               internalMqtt=internalMqtt,
                                               masterGraph=masterGraph,
+                                              debugPageData=debugPageData,
                                               debug=arg['-v']),
                       interface='::')
     log.warn('serving on %s', port)