diff service/mqtt_to_rdf/mqtt_to_rdf.py @ 780:729ab70c6212

reformat, update build
author drewp@bigasterisk.com
date Sat, 08 Aug 2020 15:06:22 -0700
parents f3607a373a00
children 13970578a443
line wrap: on
line diff
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py	Sat Aug 08 14:02:46 2020 -0700
+++ b/service/mqtt_to_rdf/mqtt_to_rdf.py	Sat Aug 08 15:06:22 2020 -0700
@@ -2,31 +2,34 @@
 Subscribe to mqtt topics; generate RDF statements.
 """
 import json
-import sys
 from pathlib import Path
+
+import cyclone.web
 from docopt import docopt
-from rdflib import Namespace, URIRef, Literal, Graph, RDF, XSD
-from rdflib.parser import StringInputSource
-from rdflib.term import Node
-from twisted.internet import reactor
-import cyclone.web
-import rx, rx.operators, rx.scheduler.eventloop
+from export_to_influxdb import InfluxExporter
 from greplin import scales
 from greplin.scales.cyclonehandler import StatsHandler
-
-from export_to_influxdb import InfluxExporter
 from mqtt_client import MqttClient
-
-from patchablegraph import PatchableGraph, CycloneGraphHandler, CycloneGraphEventsHandler
-from rdfdb.patch import Patch
+from patchablegraph import (
+    CycloneGraphEventsHandler,
+    CycloneGraphHandler,
+    PatchableGraph,
+)
 from rdfdb.rdflibpatch import graphFromQuads
+from rdflib import Graph, Literal, Namespace, RDF, URIRef, XSD
+from rdflib.term import Node
+import rx
+import rx.operators
+import rx.scheduler.eventloop
 from standardservice.logsetup import log, verboseLogging
 from standardservice.scalessetup import gatherProcessStats
+from twisted.internet import reactor
 
 ROOM = Namespace('http://projects.bigasterisk.com/room/')
 
 gatherProcessStats()
 
+
 def parseDurationLiteral(lit: Literal) -> float:
     if lit.endswith('s'):
         return float(lit.split('s')[0])
@@ -34,11 +37,12 @@
 
 
 class MqttStatementSource:
+
     def __init__(self, uri, config, masterGraph, mqtt, internalMqtt, influx):
         self.uri = uri
         self.config = config
         self.masterGraph = masterGraph
-        self.mqtt = mqtt # deprecated
+        self.mqtt = mqtt  # deprecated
         self.internalMqtt = internalMqtt
         self.influx = influx
 
@@ -47,9 +51,7 @@
 
         statPath = '/subscribed_topic/' + self.mqttTopic.decode('ascii').replace('/', '|')
         scales.init(self, statPath)
-        self._mqttStats = scales.collection(
-            statPath + '/incoming', scales.IntStat('count'),
-            scales.RecentFpsStat('fps'))
+        self._mqttStats = scales.collection(statPath + '/incoming', scales.IntStat('count'), scales.RecentFpsStat('fps'))
 
         rawBytes = self.subscribeMqtt(self.mqttTopic)
         rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes)
@@ -60,8 +62,7 @@
             parsed = self.conversionStep(conv)(parsed)
 
         outputQuadsSets = rx.combine_latest(
-            *[self.makeQuads(parsed, plan)
-              for plan in g.objects(self.uri, ROOM['graphStatements'])])
+            *[self.makeQuads(parsed, plan) for plan in g.objects(self.uri, ROOM['graphStatements'])])
 
         outputQuadsSets.subscribe_(self.updateQuads)
 
@@ -69,7 +70,6 @@
         topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic'])))
         return b'/'.join(t.encode('ascii') for t in topicParts)
 
-
     def subscribeMqtt(self, topic):
         # goal is to get everyone on the internal broker and eliminate this
         mqtt = self.internalMqtt if topic.startswith(b'frontdoorlock') else self.mqtt
@@ -111,13 +111,9 @@
 
     def makeQuads(self, parsed, plan):
         g = self.config
+
         def quadsFromValue(valueNode):
-            return set([
-                (self.uri,
-                 g.value(plan, ROOM['outputPredicate']),
-                 valueNode,
-                 self.uri)
-            ])
+            return set([(self.uri, g.value(plan, ROOM['outputPredicate']), valueNode, self.uri)])
 
         def emptyQuads(element):
             return set([])
@@ -131,7 +127,7 @@
                 rx.operators.debounce(sec, rx.scheduler.eventloop.TwistedScheduler(reactor)),
                 rx.operators.map(emptyQuads),
                 rx.operators.merge(quads),
-                )
+            )
 
         return quads
 
@@ -177,29 +173,41 @@
 
     masterGraph = PatchableGraph()
 
-    mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='bang',
-                      brokerPort=1883) # deprecated
-    internalMqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='bang',
-                              brokerPort=10010)
+    mqtt = MqttClient(clientId='mqtt_to_rdf', brokerHost='mosquitto-ext.default.svc.cluster.local', brokerPort=1883)  # deprecated
+    internalMqtt = MqttClient(clientId='mqtt_to_rdf',
+                              brokerHost='mosquitto-frontdoor.default.svc.cluster.local',
+                              brokerPort=10210)
     influx = InfluxExporter(config)
 
     srcs = []
     for src in config.subjects(RDF.type, ROOM['MqttStatementSource']):
-        srcs.append(MqttStatementSource(src, config, masterGraph, 
-        mqtt=mqtt, internalMqtt=internalMqtt, influx=influx))
+        srcs.append(MqttStatementSource(src, config, masterGraph, mqtt=mqtt, internalMqtt=internalMqtt, influx=influx))
     log.info(f'set up {len(srcs)} sources')
 
     port = 10018
-    reactor.listenTCP(port, cyclone.web.Application([
-        (r"/()", cyclone.web.StaticFileHandler,
-         {"path": ".", "default_filename": "index.html"}),
-        (r"/build/(bundle.js)",
-         cyclone.web.StaticFileHandler, {"path": "build"}),
-        (r'/stats/(.*)', StatsHandler, {'serverName': 'mqtt_to_rdf'}),
-        (r"/graph/mqtt", CycloneGraphHandler, {'masterGraph': masterGraph}),
-        (r"/graph/mqtt/events", CycloneGraphEventsHandler,
-         {'masterGraph': masterGraph}),
-        ], mqtt=mqtt, internalMqtt=internalMqtt, masterGraph=masterGraph, debug=arg['-v']),
+    reactor.listenTCP(port,
+                      cyclone.web.Application([
+                          (r"/()", cyclone.web.StaticFileHandler, {
+                              "path": ".",
+                              "default_filename": "index.html"
+                          }),
+                          (r"/build/(bundle.js)", cyclone.web.StaticFileHandler, {
+                              "path": "build"
+                          }),
+                          (r'/stats/(.*)', StatsHandler, {
+                              'serverName': 'mqtt_to_rdf'
+                          }),
+                          (r"/graph/mqtt", CycloneGraphHandler, {
+                              'masterGraph': masterGraph
+                          }),
+                          (r"/graph/mqtt/events", CycloneGraphEventsHandler, {
+                              'masterGraph': masterGraph
+                          }),
+                      ],
+                                              mqtt=mqtt,
+                                              internalMqtt=internalMqtt,
+                                              masterGraph=masterGraph,
+                                              debug=arg['-v']),
                       interface='::')
     log.warn('serving on %s', port)