Mercurial > code > home > repos > homeauto
diff service/mqtt_to_rdf/mqtt_to_rdf.py @ 791:8f4e814eb1ab
cleanup
author | drewp@bigasterisk.com |
---|---|
date | Mon, 30 Nov 2020 23:39:30 -0800 |
parents | 13970578a443 |
children | c3e3bd5dfa0b |
line wrap: on
line diff
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py Mon Nov 30 23:36:57 2020 -0800 +++ b/service/mqtt_to_rdf/mqtt_to_rdf.py Mon Nov 30 23:39:30 2020 -0800 @@ -19,6 +19,7 @@ from rdflib import Graph, Literal, Namespace, RDF, URIRef, XSD from rdflib.term import Node import rx +from rx.core import Observable import rx.operators import rx.scheduler.eventloop from standardservice.logsetup import log, verboseLogging @@ -38,7 +39,7 @@ class MqttStatementSource: - def __init__(self, uri, config, masterGraph, mqtt, internalMqtt, influx): + def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, influx): self.uri = uri self.config = config self.masterGraph = masterGraph @@ -81,6 +82,7 @@ def getParser(self): g = self.config + parser = g.value(self.uri, ROOM['parser']) if parser == XSD.double: return rx.operators.map(lambda v: Literal(float(v.decode('ascii')))) @@ -102,7 +104,11 @@ def conversionStep(self, conv: Node): g = self.config if conv == ROOM['celsiusToFarenheit']: - return rx.operators.map(lambda value: Literal(round(value.toPython() * 1.8 + 32, 2))) + + def c2f(value: Literal) -> Node: + return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2)) + + return rx.operators.map(c2f) elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None: threshold = g.value(conv, ROOM['ignoreValueBelow']) return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython()) @@ -123,8 +129,9 @@ dur = g.value(plan, ROOM['statementLifetime']) if dur is not None: sec = parseDurationLiteral(dur) + loop = rx.scheduler.eventloop.TwistedScheduler(reactor) quads = quads.pipe( - rx.operators.debounce(sec, rx.scheduler.eventloop.TwistedScheduler(reactor)), + rx.operators.debounce(sec, loop), rx.operators.map(emptyQuads), rx.operators.merge(quads), )