diff service/mqtt_to_rdf/mqtt_to_rdf.py @ 791:8f4e814eb1ab

cleanup
author drewp@bigasterisk.com
date Mon, 30 Nov 2020 23:39:30 -0800
parents 13970578a443
children c3e3bd5dfa0b
line wrap: on
line diff
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py	Mon Nov 30 23:36:57 2020 -0800
+++ b/service/mqtt_to_rdf/mqtt_to_rdf.py	Mon Nov 30 23:39:30 2020 -0800
@@ -19,6 +19,7 @@
 from rdflib import Graph, Literal, Namespace, RDF, URIRef, XSD
 from rdflib.term import Node
 import rx
+from rx.core import Observable
 import rx.operators
 import rx.scheduler.eventloop
 from standardservice.logsetup import log, verboseLogging
@@ -38,7 +39,7 @@
 
 class MqttStatementSource:
 
-    def __init__(self, uri, config, masterGraph, mqtt, internalMqtt, influx):
+    def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, influx):
         self.uri = uri
         self.config = config
         self.masterGraph = masterGraph
@@ -81,6 +82,7 @@
 
     def getParser(self):
         g = self.config
+
         parser = g.value(self.uri, ROOM['parser'])
         if parser == XSD.double:
             return rx.operators.map(lambda v: Literal(float(v.decode('ascii'))))
@@ -102,7 +104,11 @@
     def conversionStep(self, conv: Node):
         g = self.config
         if conv == ROOM['celsiusToFarenheit']:
-            return rx.operators.map(lambda value: Literal(round(value.toPython() * 1.8 + 32, 2)))
+
+            def c2f(value: Literal) -> Node:
+                return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2))
+
+            return rx.operators.map(c2f)
         elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None:
             threshold = g.value(conv, ROOM['ignoreValueBelow'])
             return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython())
@@ -123,8 +129,9 @@
         dur = g.value(plan, ROOM['statementLifetime'])
         if dur is not None:
             sec = parseDurationLiteral(dur)
+            loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
             quads = quads.pipe(
-                rx.operators.debounce(sec, rx.scheduler.eventloop.TwistedScheduler(reactor)),
+                rx.operators.debounce(sec, loop),
                 rx.operators.map(emptyQuads),
                 rx.operators.merge(quads),
             )