changeset 793:c3e3bd5dfa0b

add rf button mqtt message processing
author drewp@bigasterisk.com
date Mon, 30 Nov 2020 23:40:38 -0800
parents 06583e0b5885
children fafe86ae0b03
files service/mqtt_to_rdf/button_events.py service/mqtt_to_rdf/config_rf.n3 service/mqtt_to_rdf/mqtt_to_rdf.py
diffstat 3 files changed, 116 insertions(+), 1 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/button_events.py	Mon Nov 30 23:40:38 2020 -0800
@@ -0,0 +1,66 @@
+"""given repeated RF codes for a button press, make interesting button events"""
+
+import time
+from typing import Any, Callable, List
+from rx.core.typing import Disposable
+from rx.core import Observable, typing
+from rx.disposable import CompositeDisposable, SingleAssignmentDisposable, SerialDisposable
+from rx.scheduler import TimeoutScheduler
+from rdflib import Namespace
+ROOM = Namespace('http://projects.bigasterisk.com/room/')
+
+
+def button_events(min_hold_sec: float,
+                  release_after_sec: float,
+                  scheduler=typing.Scheduler) -> Callable[[Observable], Observable]:
+
+    def op(source: Observable) -> Observable:
+
+        def subscribe(observer, scheduler_=None) -> Disposable:
+            _scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton()
+            cancelable = SerialDisposable()
+            button_state = [None]
+
+            press_time: List[float] = [0.]
+
+            def set_state(state):
+                button_state[0] = state
+                observer.on_next(button_state[0])
+
+            set_state(ROOM['notPressed'])
+
+            def on_next(x: Any) -> None:
+                now = time.time()
+                if button_state[0] == ROOM['notPressed']:
+                    set_state(ROOM['pressed'])
+                    press_time[0] = now
+                elif button_state[0] == ROOM['pressed']:
+                    if now > press_time[0] + min_hold_sec:
+                        set_state(ROOM['held'])  # should be pressed AND held
+                elif button_state[0] == ROOM['held']:
+                    pass
+                else:
+                    raise NotImplementedError(f'button_state={button_state}')
+
+                d = SingleAssignmentDisposable()
+                cancelable.disposable = d
+
+                def action(scheduler, state=None) -> None:
+                    if button_state[0] != ROOM['notPressed']:
+                        set_state(ROOM['notPressed'])
+
+                d.disposable = _scheduler.schedule_relative(release_after_sec, action)
+
+            def on_error(exception: Exception) -> None:
+                cancelable.dispose()
+                observer.on_error(exception)
+
+            def on_completed() -> None:
+                raise NotImplementedError
+
+            subscription = source.subscribe_(on_next, on_error, on_completed, scheduler=scheduler_)
+            return CompositeDisposable(subscription, cancelable)
+
+        return Observable(subscribe)
+
+    return op
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/config_rf.n3	Mon Nov 30 23:40:38 2020 -0800
@@ -0,0 +1,24 @@
+@prefix : <http://projects.bigasterisk.com/room/> .
+@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
+@prefix fr: <http://bigasterisk.com/foaf/> .
+
+:wallButton1 a :MqttStatementSource;
+  :mqttTopic ("rfsetup" "rf_received");
+  :filterPayloadJsonEquals "{\"protocol\":1,\"code0\":0,\"code1\":7906}";
+  :parser :rfCode;
+  :conversions (:buttonPress);
+  :graphStatements [:outputPredicate :state] .
+
+:wallButton2 a :MqttStatementSource;
+  :mqttTopic ("rfsetup" "rf_received");
+  :filterPayloadJsonEquals "{\"protocol\":1,\"code0\":0,\"code1\":7905}";
+  :parser :rfCode;
+  :conversions (:buttonPress);
+  :graphStatements [:outputPredicate :state] .
+
+:wallButton3 a :MqttStatementSource;
+  :mqttTopic ("rfsetup" "rf_received");
+  :filterPayloadJsonEquals "{\"protocol\":1,\"code0\":0,\"code1\":7908}";
+  :parser :rfCode;
+  :conversions (:buttonPress);
+  :graphStatements [:outputPredicate :state] .
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py	Mon Nov 30 23:41:31 2020 -0800
+++ b/service/mqtt_to_rdf/mqtt_to_rdf.py	Mon Nov 30 23:40:38 2020 -0800
@@ -3,6 +3,7 @@
 """
 import json
 from pathlib import Path
+from typing import Callable, cast
 
 import cyclone.web
 from docopt import docopt
@@ -26,6 +27,8 @@
 from standardservice.scalessetup import gatherProcessStats
 from twisted.internet import reactor
 
+from button_events import button_events
+
 ROOM = Namespace('http://projects.bigasterisk.com/room/')
 
 gatherProcessStats()
@@ -55,6 +58,7 @@
         self._mqttStats = scales.collection(statPath + '/incoming', scales.IntStat('count'), scales.RecentFpsStat('fps'))
 
         rawBytes = self.subscribeMqtt(self.mqttTopic)
+        rawBytes = self.addFilters(rawBytes)
         rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes)
         parsed = self.getParser()(rawBytes)
 
@@ -67,6 +71,18 @@
 
         outputQuadsSets.subscribe_(self.updateQuads)
 
+    def addFilters(self, rawBytes):
+        jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals'])
+        if jsonEq:
+            required = json.loads(jsonEq.toPython())
+
+            def eq(jsonBytes):
+                msg = json.loads(jsonBytes.decode('ascii'))
+                return msg == required
+
+            rawBytes = rx.operators.filter(eq)(rawBytes)
+        return rawBytes
+
     def topicFromConfig(self, config) -> bytes:
         topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic'])))
         return b'/'.join(t.encode('ascii') for t in topicParts)
@@ -94,6 +110,8 @@
             return rx.operators.map(self.parseJsonBrightness)
         elif ROOM['ValueMap'] in g.objects(parser, RDF.type):
             return rx.operators.map(lambda v: self.remap(parser, v.decode('ascii')))
+        elif parser == ROOM['rfCode']:
+            return rx.operators.map(self.parseJsonRfCode)
         else:
             raise NotImplementedError(parser)
 
@@ -101,7 +119,11 @@
         msg = json.loads(mqttValue.decode('ascii'))
         return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0)
 
-    def conversionStep(self, conv: Node):
+    def parseJsonRfCode(self, mqttValue: bytes):
+        msg = json.loads(mqttValue.decode('ascii'))
+        return Literal('%08x%08x' % (msg['code0'], msg['code1']))
+
+    def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]:
         g = self.config
         if conv == ROOM['celsiusToFarenheit']:
 
@@ -112,6 +134,9 @@
         elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None:
             threshold = g.value(conv, ROOM['ignoreValueBelow'])
             return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython())
+        elif conv == ROOM['buttonPress']:
+            loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
+            return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop)
         else:
             raise NotImplementedError(conv)