diff service/mqtt_to_rdf/mqtt_to_rdf.py @ 793:c3e3bd5dfa0b

add rf button mqtt message processing
author drewp@bigasterisk.com
date Mon, 30 Nov 2020 23:40:38 -0800
parents 8f4e814eb1ab
children fc74ae6d5d68
line wrap: on
line diff
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py	Mon Nov 30 23:41:31 2020 -0800
+++ b/service/mqtt_to_rdf/mqtt_to_rdf.py	Mon Nov 30 23:40:38 2020 -0800
@@ -3,6 +3,7 @@
 """
 import json
 from pathlib import Path
+from typing import Callable, cast
 
 import cyclone.web
 from docopt import docopt
@@ -26,6 +27,8 @@
 from standardservice.scalessetup import gatherProcessStats
 from twisted.internet import reactor
 
+from button_events import button_events
+
 ROOM = Namespace('http://projects.bigasterisk.com/room/')
 
 gatherProcessStats()
@@ -55,6 +58,7 @@
         self._mqttStats = scales.collection(statPath + '/incoming', scales.IntStat('count'), scales.RecentFpsStat('fps'))
 
         rawBytes = self.subscribeMqtt(self.mqttTopic)
+        rawBytes = self.addFilters(rawBytes)
         rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes)
         parsed = self.getParser()(rawBytes)
 
@@ -67,6 +71,18 @@
 
         outputQuadsSets.subscribe_(self.updateQuads)
 
+    def addFilters(self, rawBytes):
+        jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals'])
+        if jsonEq:
+            required = json.loads(jsonEq.toPython())
+
+            def eq(jsonBytes):
+                msg = json.loads(jsonBytes.decode('ascii'))
+                return msg == required
+
+            rawBytes = rx.operators.filter(eq)(rawBytes)
+        return rawBytes
+
     def topicFromConfig(self, config) -> bytes:
         topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic'])))
         return b'/'.join(t.encode('ascii') for t in topicParts)
@@ -94,6 +110,8 @@
             return rx.operators.map(self.parseJsonBrightness)
         elif ROOM['ValueMap'] in g.objects(parser, RDF.type):
             return rx.operators.map(lambda v: self.remap(parser, v.decode('ascii')))
+        elif parser == ROOM['rfCode']:
+            return rx.operators.map(self.parseJsonRfCode)
         else:
             raise NotImplementedError(parser)
 
@@ -101,7 +119,11 @@
         msg = json.loads(mqttValue.decode('ascii'))
         return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0)
 
-    def conversionStep(self, conv: Node):
+    def parseJsonRfCode(self, mqttValue: bytes):
+        msg = json.loads(mqttValue.decode('ascii'))
+        return Literal('%08x%08x' % (msg['code0'], msg['code1']))
+
+    def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]:
         g = self.config
         if conv == ROOM['celsiusToFarenheit']:
 
@@ -112,6 +134,9 @@
         elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None:
             threshold = g.value(conv, ROOM['ignoreValueBelow'])
             return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython())
+        elif conv == ROOM['buttonPress']:
+            loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
+            return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop)
         else:
             raise NotImplementedError(conv)