Mercurial > code > home > repos > homeauto
diff service/mqtt_to_rdf/mqtt_to_rdf.py @ 793:c3e3bd5dfa0b
add rf button mqtt message processing
author | drewp@bigasterisk.com |
---|---|
date | Mon, 30 Nov 2020 23:40:38 -0800 |
parents | 8f4e814eb1ab |
children | fc74ae6d5d68 |
line wrap: on
line diff
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py Mon Nov 30 23:41:31 2020 -0800 +++ b/service/mqtt_to_rdf/mqtt_to_rdf.py Mon Nov 30 23:40:38 2020 -0800 @@ -3,6 +3,7 @@ """ import json from pathlib import Path +from typing import Callable, cast import cyclone.web from docopt import docopt @@ -26,6 +27,8 @@ from standardservice.scalessetup import gatherProcessStats from twisted.internet import reactor +from button_events import button_events + ROOM = Namespace('http://projects.bigasterisk.com/room/') gatherProcessStats() @@ -55,6 +58,7 @@ self._mqttStats = scales.collection(statPath + '/incoming', scales.IntStat('count'), scales.RecentFpsStat('fps')) rawBytes = self.subscribeMqtt(self.mqttTopic) + rawBytes = self.addFilters(rawBytes) rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes) parsed = self.getParser()(rawBytes) @@ -67,6 +71,18 @@ outputQuadsSets.subscribe_(self.updateQuads) + def addFilters(self, rawBytes): + jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals']) + if jsonEq: + required = json.loads(jsonEq.toPython()) + + def eq(jsonBytes): + msg = json.loads(jsonBytes.decode('ascii')) + return msg == required + + rawBytes = rx.operators.filter(eq)(rawBytes) + return rawBytes + def topicFromConfig(self, config) -> bytes: topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) return b'/'.join(t.encode('ascii') for t in topicParts) @@ -94,6 +110,8 @@ return rx.operators.map(self.parseJsonBrightness) elif ROOM['ValueMap'] in g.objects(parser, RDF.type): return rx.operators.map(lambda v: self.remap(parser, v.decode('ascii'))) + elif parser == ROOM['rfCode']: + return rx.operators.map(self.parseJsonRfCode) else: raise NotImplementedError(parser) @@ -101,7 +119,11 @@ msg = json.loads(mqttValue.decode('ascii')) return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0) - def conversionStep(self, conv: Node): + def parseJsonRfCode(self, mqttValue: bytes): + msg = json.loads(mqttValue.decode('ascii')) + return Literal('%08x%08x' % (msg['code0'], msg['code1'])) + + def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]: g = self.config if conv == ROOM['celsiusToFarenheit']: @@ -112,6 +134,9 @@ elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None: threshold = g.value(conv, ROOM['ignoreValueBelow']) return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython()) + elif conv == ROOM['buttonPress']: + loop = rx.scheduler.eventloop.TwistedScheduler(reactor) + return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop) else: raise NotImplementedError(conv)