comparison service/mqtt_to_rdf/mqtt_to_rdf.py @ 791:8f4e814eb1ab

cleanup
author drewp@bigasterisk.com
date Mon, 30 Nov 2020 23:39:30 -0800
parents 13970578a443
children c3e3bd5dfa0b
comparison
equal deleted inserted replaced
790:b85986495848 791:8f4e814eb1ab
17 ) 17 )
18 from rdfdb.rdflibpatch import graphFromQuads 18 from rdfdb.rdflibpatch import graphFromQuads
19 from rdflib import Graph, Literal, Namespace, RDF, URIRef, XSD 19 from rdflib import Graph, Literal, Namespace, RDF, URIRef, XSD
20 from rdflib.term import Node 20 from rdflib.term import Node
21 import rx 21 import rx
22 from rx.core import Observable
22 import rx.operators 23 import rx.operators
23 import rx.scheduler.eventloop 24 import rx.scheduler.eventloop
24 from standardservice.logsetup import log, verboseLogging 25 from standardservice.logsetup import log, verboseLogging
25 from standardservice.scalessetup import gatherProcessStats 26 from standardservice.scalessetup import gatherProcessStats
26 from twisted.internet import reactor 27 from twisted.internet import reactor
36 raise NotImplementedError(f'duration literal: {lit}') 37 raise NotImplementedError(f'duration literal: {lit}')
37 38
38 39
39 class MqttStatementSource: 40 class MqttStatementSource:
40 41
41 def __init__(self, uri, config, masterGraph, mqtt, internalMqtt, influx): 42 def __init__(self, uri: URIRef, config: Graph, masterGraph: PatchableGraph, mqtt, internalMqtt, influx):
42 self.uri = uri 43 self.uri = uri
43 self.config = config 44 self.config = config
44 self.masterGraph = masterGraph 45 self.masterGraph = masterGraph
45 self.mqtt = mqtt # deprecated 46 self.mqtt = mqtt # deprecated
46 self.internalMqtt = internalMqtt 47 self.internalMqtt = internalMqtt
79 self._mqttStats.fps.mark() 80 self._mqttStats.fps.mark()
80 self._mqttStats.count += 1 81 self._mqttStats.count += 1
81 82
82 def getParser(self): 83 def getParser(self):
83 g = self.config 84 g = self.config
85
84 parser = g.value(self.uri, ROOM['parser']) 86 parser = g.value(self.uri, ROOM['parser'])
85 if parser == XSD.double: 87 if parser == XSD.double:
86 return rx.operators.map(lambda v: Literal(float(v.decode('ascii')))) 88 return rx.operators.map(lambda v: Literal(float(v.decode('ascii'))))
87 elif parser == ROOM['tagIdToUri']: 89 elif parser == ROOM['tagIdToUri']:
88 return rx.operators.map(self.tagIdToUri) 90 return rx.operators.map(self.tagIdToUri)
100 return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0) 102 return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0)
101 103
102 def conversionStep(self, conv: Node): 104 def conversionStep(self, conv: Node):
103 g = self.config 105 g = self.config
104 if conv == ROOM['celsiusToFarenheit']: 106 if conv == ROOM['celsiusToFarenheit']:
105 return rx.operators.map(lambda value: Literal(round(value.toPython() * 1.8 + 32, 2))) 107
108 def c2f(value: Literal) -> Node:
109 return Literal(round(cast(float, value.toPython()) * 1.8 + 32, 2))
110
111 return rx.operators.map(c2f)
106 elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None: 112 elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None:
107 threshold = g.value(conv, ROOM['ignoreValueBelow']) 113 threshold = g.value(conv, ROOM['ignoreValueBelow'])
108 return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython()) 114 return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython())
109 else: 115 else:
110 raise NotImplementedError(conv) 116 raise NotImplementedError(conv)
121 quads = rx.operators.map(quadsFromValue)(parsed) 127 quads = rx.operators.map(quadsFromValue)(parsed)
122 128
123 dur = g.value(plan, ROOM['statementLifetime']) 129 dur = g.value(plan, ROOM['statementLifetime'])
124 if dur is not None: 130 if dur is not None:
125 sec = parseDurationLiteral(dur) 131 sec = parseDurationLiteral(dur)
132 loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
126 quads = quads.pipe( 133 quads = quads.pipe(
127 rx.operators.debounce(sec, rx.scheduler.eventloop.TwistedScheduler(reactor)), 134 rx.operators.debounce(sec, loop),
128 rx.operators.map(emptyQuads), 135 rx.operators.map(emptyQuads),
129 rx.operators.merge(quads), 136 rx.operators.merge(quads),
130 ) 137 )
131 138
132 return quads 139 return quads