Mercurial > code > home > repos > homeauto
changeset 791:8f4e814eb1ab
cleanup
author | drewp@bigasterisk.com |
---|---|
date | Mon, 30 Nov 2020 23:39:30 -0800 |
parents | b85986495848 |
children | 06583e0b5885 |
files | service/mqtt_to_rdf/mqtt_to_rdf.py service/mqtt_to_rdf/requirements.txt |
diffstat | 2 files changed, 12 insertions(+), 4 deletions(-) [+] |
line wrap: on
line diff
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py Mon Nov 30 23:36:57 2020 -0800 +++ b/service/mqtt_to_rdf/mqtt_to_rdf.py Mon Nov 30 23:39:30 2020 -0800 @@ -19,6 +19,7 @@ from rdflib import Graph, Literal, Namespace, RDF, URIRef, XSD from rdflib.term import Node import rx +from rx.core import Observable import rx.operators import rx.scheduler.eventloop from standardservice.logsetup import log, verboseLogging @@ -38,7 +39,7 @@ class MqttStatementSource: - def __init__(self, uri, config, masterGraph, mqtt, internalMqtt, influx): + def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, influx): self.uri = uri self.config = config self.masterGraph = masterGraph @@ -81,6 +82,7 @@ def getParser(self): g = self.config + parser = g.value(self.uri, ROOM['parser']) if parser == XSD.double: return rx.operators.map(lambda v: Literal(float(v.decode('ascii')))) @@ -102,7 +104,11 @@ def conversionStep(self, conv: Node): g = self.config if conv == ROOM['celsiusToFarenheit']: - return rx.operators.map(lambda value: Literal(round(value.toPython() * 1.8 + 32, 2))) + + def c2f(value: Literal) -> Node: + return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2)) + + return rx.operators.map(c2f) elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None: threshold = g.value(conv, ROOM['ignoreValueBelow']) return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython()) @@ -123,8 +129,9 @@ dur = g.value(plan, ROOM['statementLifetime']) if dur is not None: sec = parseDurationLiteral(dur) + loop = rx.scheduler.eventloop.TwistedScheduler(reactor) quads = quads.pipe( - rx.operators.debounce(sec, rx.scheduler.eventloop.TwistedScheduler(reactor)), + rx.operators.debounce(sec, loop), rx.operators.map(emptyQuads), rx.operators.merge(quads), )
--- a/service/mqtt_to_rdf/requirements.txt Mon Nov 30 23:36:57 2020 -0800 +++ b/service/mqtt_to_rdf/requirements.txt Mon Nov 30 23:39:30 2020 -0800 @@ -5,9 +5,10 @@ git+http://github.com/drewp/scales.git@550bcba42c5e96152ff5c5bd753fbb33ffdfe460#egg=scales git+https://github.com/ReactiveX/RxPY.git@6deb66e827f34a88b4605773d7671322b9cbbd08#egg=rx docopt + cycloneerr export_to_influxdb==0.4.0 mqtt_client==0.9.0 patchablegraph==0.11.0 rdfdb==0.21.0 -standardservice==0.6.0 +standardservice==0.6.0 \ No newline at end of file