diff service/mqtt_to_rdf/mqtt_to_rdf.py @ 1583:b0608eb6e90c

dead code, sort reqs
author drewp@bigasterisk.com
date Sun, 29 Aug 2021 13:43:14 -0700
parents 6ddc5e037f15
children 1c36ad1eb8b3
line wrap: on
line diff
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py	Sun Aug 29 13:36:08 2021 -0700
+++ b/service/mqtt_to_rdf/mqtt_to_rdf.py	Sun Aug 29 13:43:14 2021 -0700
@@ -8,6 +8,7 @@
 from pathlib import Path
 from typing import Callable, Sequence, Set, Tuple, Union, cast
 from cyclone.util import ObjectDict
+from rdflib.graph import ConjunctiveGraph
 
 from rx.core.typing import Mapper
 
@@ -33,15 +34,18 @@
 from twisted.internet import reactor, task
 from dataclasses import dataclass
 from button_events import button_events
+from patch_cyclone_sse import patchCycloneSse
 
 ROOM = Namespace('http://projects.bigasterisk.com/room/')
-
+MESSAGES_SEEN = Counter('mqtt_messages_seen', '')
 collectors = {}
 
 import export_to_influxdb
 
 print(f'merge me back {export_to_influxdb}')
 
+patchCycloneSse()
+
 
 def appendLimit(lst, elem, n=10):
     del lst[:len(lst) - n + 1]
@@ -168,8 +172,11 @@
 class Rdfizer(StreamPipelineStep):
 
     def makeOutputStream(self, inStream: Observable) -> Observable:
-        outputQuadsSets = rx.combine_latest(
-            *[self.makeQuads(inStream, plan) for plan in self.config.objects(self.uri, ROOM['graphStatements'])])
+        plans = list(self.config.objects(self.uri, ROOM['graphStatements']))
+        log.debug(f'{self.uri=} has {len(plans)=}')
+        if not plans:
+            return rx.empty()
+        outputQuadsSets = rx.combine_latest(*[self.makeQuads(inStream, plan) for plan in plans])
         return outputQuadsSets
 
     def makeQuads(self, inStream: Observable, plan: URIRef) -> Observable:
@@ -203,8 +210,8 @@
     return node.n3().replace('http://www.w3.org/2001/XMLSchema#', 'xsd:')
 
 
-def serializeWithNs(graph: PatchableGraph) -> bytes:
-    graph._graph.bind('', 'http://projects.bigasterisk.com/room/')
+def serializeWithNs(graph: ConjunctiveGraph) -> bytes:
+    graph.bind('', ROOM)
     return cast(bytes, graph.serialize(format='n3'))
 
 
@@ -248,6 +255,7 @@
         convertedObjs.subscribe_(lambda v: appendLimit(self.debugSub['recentConversions'], {'t': truncTime(), 'n3': tightN3(v)}))
 
         outputQuadsSets = Rdfizer(uri, config).makeOutputStream(convertedObjs)
+        outputQuadsSets.subscribe_(self.updateInflux)
 
         outputQuadsSets.subscribe_(self.updateMasterGraph)
 
@@ -262,12 +270,17 @@
 
     def countIncomingMessage(self, msg: bytes):
         self.debugPageData['messagesSeen'] += 1
+        MESSAGES_SEEN.inc()
 
         appendLimit(self.debugSub['recentMessages'], {
             't': truncTime(),
             'msg': msg.decode('ascii'),
         })
 
+    def updateInflux(self, newGraphs):
+        for g in newGraphs:
+            self.influxExport.exportToInflux(g)
+
     def updateMasterGraph(self, newGraphs):
         newQuads = set.union(*newGraphs)
         g = graphFromQuads(newQuads)
@@ -287,7 +300,7 @@
                 collectors[metric].labels(**tags).set(val)
 
         self.masterGraph.patchSubgraph(self.uri, g)
-        self.debugSub['currentOutputGraph']['n3'] = cast(bytes, self.masterGraph.serialize(format='n3')).decode('utf8')
+        self.debugSub['currentOutputGraph']['n3'] = serializeWithNs(g).decode('utf8')
 
 
 class Metrics(cyclone.web.RequestHandler):