Mercurial > code > home > repos > homeauto
comparison service/mqtt_to_rdf/mqtt_to_rdf.py @ 791:8f4e814eb1ab
cleanup
author | drewp@bigasterisk.com |
---|---|
date | Mon, 30 Nov 2020 23:39:30 -0800 |
parents | 13970578a443 |
children | c3e3bd5dfa0b |
comparison
equal
deleted
inserted
replaced
790:b85986495848 | 791:8f4e814eb1ab |
---|---|
17 ) | 17 ) |
18 from rdfdb.rdflibpatch import graphFromQuads | 18 from rdfdb.rdflibpatch import graphFromQuads |
19 from rdflib import Graph, Literal, Namespace, RDF, URIRef, XSD | 19 from rdflib import Graph, Literal, Namespace, RDF, URIRef, XSD |
20 from rdflib.term import Node | 20 from rdflib.term import Node |
21 import rx | 21 import rx |
22 from rx.core import Observable | |
22 import rx.operators | 23 import rx.operators |
23 import rx.scheduler.eventloop | 24 import rx.scheduler.eventloop |
24 from standardservice.logsetup import log, verboseLogging | 25 from standardservice.logsetup import log, verboseLogging |
25 from standardservice.scalessetup import gatherProcessStats | 26 from standardservice.scalessetup import gatherProcessStats |
26 from twisted.internet import reactor | 27 from twisted.internet import reactor |
36 raise NotImplementedError(f'duration literal: {lit}') | 37 raise NotImplementedError(f'duration literal: {lit}') |
37 | 38 |
38 | 39 |
39 class MqttStatementSource: | 40 class MqttStatementSource: |
40 | 41 |
41 def __init__(self, uri, config, masterGraph, mqtt, internalMqtt, influx): | 42 def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, influx): |
42 self.uri = uri | 43 self.uri = uri |
43 self.config = config | 44 self.config = config |
44 self.masterGraph = masterGraph | 45 self.masterGraph = masterGraph |
45 self.mqtt = mqtt # deprecated | 46 self.mqtt = mqtt # deprecated |
46 self.internalMqtt = internalMqtt | 47 self.internalMqtt = internalMqtt |
79 self._mqttStats.fps.mark() | 80 self._mqttStats.fps.mark() |
80 self._mqttStats.count += 1 | 81 self._mqttStats.count += 1 |
81 | 82 |
82 def getParser(self): | 83 def getParser(self): |
83 g = self.config | 84 g = self.config |
85 | |
84 parser = g.value(self.uri, ROOM['parser']) | 86 parser = g.value(self.uri, ROOM['parser']) |
85 if parser == XSD.double: | 87 if parser == XSD.double: |
86 return rx.operators.map(lambda v: Literal(float(v.decode('ascii')))) | 88 return rx.operators.map(lambda v: Literal(float(v.decode('ascii')))) |
87 elif parser == ROOM['tagIdToUri']: | 89 elif parser == ROOM['tagIdToUri']: |
88 return rx.operators.map(self.tagIdToUri) | 90 return rx.operators.map(self.tagIdToUri) |
100 return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0) | 102 return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0) |
101 | 103 |
102 def conversionStep(self, conv: Node): | 104 def conversionStep(self, conv: Node): |
103 g = self.config | 105 g = self.config |
104 if conv == ROOM['celsiusToFarenheit']: | 106 if conv == ROOM['celsiusToFarenheit']: |
105 return rx.operators.map(lambda value: Literal(round(value.toPython() * 1.8 + 32, 2))) | 107 |
108 def c2f(value: Literal) -> Node: | |
109 return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2)) | |
110 | |
111 return rx.operators.map(c2f) | |
106 elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None: | 112 elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None: |
107 threshold = g.value(conv, ROOM['ignoreValueBelow']) | 113 threshold = g.value(conv, ROOM['ignoreValueBelow']) |
108 return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython()) | 114 return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython()) |
109 else: | 115 else: |
110 raise NotImplementedError(conv) | 116 raise NotImplementedError(conv) |
121 quads = rx.operators.map(quadsFromValue)(parsed) | 127 quads = rx.operators.map(quadsFromValue)(parsed) |
122 | 128 |
123 dur = g.value(plan, ROOM['statementLifetime']) | 129 dur = g.value(plan, ROOM['statementLifetime']) |
124 if dur is not None: | 130 if dur is not None: |
125 sec = parseDurationLiteral(dur) | 131 sec = parseDurationLiteral(dur) |
132 loop = rx.scheduler.eventloop.TwistedScheduler(reactor) | |
126 quads = quads.pipe( | 133 quads = quads.pipe( |
127 rx.operators.debounce(sec, rx.scheduler.eventloop.TwistedScheduler(reactor)), | 134 rx.operators.debounce(sec, loop), |
128 rx.operators.map(emptyQuads), | 135 rx.operators.map(emptyQuads), |
129 rx.operators.merge(quads), | 136 rx.operators.merge(quads), |
130 ) | 137 ) |
131 | 138 |
132 return quads | 139 return quads |