# HG changeset patch
# User drewp@bigasterisk.com
# Date 1606808438 28800
# Node ID c3e3bd5dfa0b6183b785366223048b8b28010f29
# Parent 06583e0b5885a377cff8adedf6837606606ec2b8
add rf button mqtt message processing
diff -r 06583e0b5885 -r c3e3bd5dfa0b service/mqtt_to_rdf/button_events.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/button_events.py Mon Nov 30 23:40:38 2020 -0800
@@ -0,0 +1,66 @@
+"""given repeated RF codes for a button press, make interesting button events"""
+
+import time
+from typing import Any, Callable, List
+from rx.core.typing import Disposable
+from rx.core import Observable, typing
+from rx.disposable import CompositeDisposable, SingleAssignmentDisposable, SerialDisposable
+from rx.scheduler import TimeoutScheduler
+from rdflib import Namespace
+ROOM = Namespace('http://projects.bigasterisk.com/room/')
+
+
+def button_events(min_hold_sec: float,
+ release_after_sec: float,
+ scheduler=typing.Scheduler) -> Callable[[Observable], Observable]:
+
+ def op(source: Observable) -> Observable:
+
+ def subscribe(observer, scheduler_=None) -> Disposable:
+ _scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton()
+ cancelable = SerialDisposable()
+ button_state = [None]
+
+ press_time: List[float] = [0.]
+
+ def set_state(state):
+ button_state[0] = state
+ observer.on_next(button_state[0])
+
+ set_state(ROOM['notPressed'])
+
+ def on_next(x: Any) -> None:
+ now = time.time()
+ if button_state[0] == ROOM['notPressed']:
+ set_state(ROOM['pressed'])
+ press_time[0] = now
+ elif button_state[0] == ROOM['pressed']:
+ if now > press_time[0] + min_hold_sec:
+ set_state(ROOM['held']) # should be pressed AND held
+ elif button_state[0] == ROOM['held']:
+ pass
+ else:
+ raise NotImplementedError(f'button_state={button_state}')
+
+ d = SingleAssignmentDisposable()
+ cancelable.disposable = d
+
+ def action(scheduler, state=None) -> None:
+ if button_state[0] != ROOM['notPressed']:
+ set_state(ROOM['notPressed'])
+
+ d.disposable = _scheduler.schedule_relative(release_after_sec, action)
+
+ def on_error(exception: Exception) -> None:
+ cancelable.dispose()
+ observer.on_error(exception)
+
+ def on_completed() -> None:
+ raise NotImplementedError
+
+ subscription = source.subscribe_(on_next, on_error, on_completed, scheduler=scheduler_)
+ return CompositeDisposable(subscription, cancelable)
+
+ return Observable(subscribe)
+
+ return op
diff -r 06583e0b5885 -r c3e3bd5dfa0b service/mqtt_to_rdf/config_rf.n3
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/service/mqtt_to_rdf/config_rf.n3 Mon Nov 30 23:40:38 2020 -0800
@@ -0,0 +1,24 @@
+@prefix : .
+@prefix rdfs: .
+@prefix fr: .
+
+:wallButton1 a :MqttStatementSource;
+ :mqttTopic ("rfsetup" "rf_received");
+ :filterPayloadJsonEquals "{\"protocol\":1,\"code0\":0,\"code1\":7906}";
+ :parser :rfCode;
+ :conversions (:buttonPress);
+ :graphStatements [:outputPredicate :state] .
+
+:wallButton2 a :MqttStatementSource;
+ :mqttTopic ("rfsetup" "rf_received");
+ :filterPayloadJsonEquals "{\"protocol\":1,\"code0\":0,\"code1\":7905}";
+ :parser :rfCode;
+ :conversions (:buttonPress);
+ :graphStatements [:outputPredicate :state] .
+
+:wallButton3 a :MqttStatementSource;
+ :mqttTopic ("rfsetup" "rf_received");
+ :filterPayloadJsonEquals "{\"protocol\":1,\"code0\":0,\"code1\":7908}";
+ :parser :rfCode;
+ :conversions (:buttonPress);
+ :graphStatements [:outputPredicate :state] .
diff -r 06583e0b5885 -r c3e3bd5dfa0b service/mqtt_to_rdf/mqtt_to_rdf.py
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py Mon Nov 30 23:41:31 2020 -0800
+++ b/service/mqtt_to_rdf/mqtt_to_rdf.py Mon Nov 30 23:40:38 2020 -0800
@@ -3,6 +3,7 @@
"""
import json
from pathlib import Path
+from typing import Callable, cast
import cyclone.web
from docopt import docopt
@@ -26,6 +27,8 @@
from standardservice.scalessetup import gatherProcessStats
from twisted.internet import reactor
+from button_events import button_events
+
ROOM = Namespace('http://projects.bigasterisk.com/room/')
gatherProcessStats()
@@ -55,6 +58,7 @@
self._mqttStats = scales.collection(statPath + '/incoming', scales.IntStat('count'), scales.RecentFpsStat('fps'))
rawBytes = self.subscribeMqtt(self.mqttTopic)
+ rawBytes = self.addFilters(rawBytes)
rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes)
parsed = self.getParser()(rawBytes)
@@ -67,6 +71,18 @@
outputQuadsSets.subscribe_(self.updateQuads)
+ def addFilters(self, rawBytes):
+ jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals'])
+ if jsonEq:
+ required = json.loads(jsonEq.toPython())
+
+ def eq(jsonBytes):
+ msg = json.loads(jsonBytes.decode('ascii'))
+ return msg == required
+
+ rawBytes = rx.operators.filter(eq)(rawBytes)
+ return rawBytes
+
def topicFromConfig(self, config) -> bytes:
topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic'])))
return b'/'.join(t.encode('ascii') for t in topicParts)
@@ -94,6 +110,8 @@
return rx.operators.map(self.parseJsonBrightness)
elif ROOM['ValueMap'] in g.objects(parser, RDF.type):
return rx.operators.map(lambda v: self.remap(parser, v.decode('ascii')))
+ elif parser == ROOM['rfCode']:
+ return rx.operators.map(self.parseJsonRfCode)
else:
raise NotImplementedError(parser)
@@ -101,7 +119,11 @@
msg = json.loads(mqttValue.decode('ascii'))
return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0)
- def conversionStep(self, conv: Node):
+ def parseJsonRfCode(self, mqttValue: bytes):
+ msg = json.loads(mqttValue.decode('ascii'))
+ return Literal('%08x%08x' % (msg['code0'], msg['code1']))
+
+ def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]:
g = self.config
if conv == ROOM['celsiusToFarenheit']:
@@ -112,6 +134,9 @@
elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None:
threshold = g.value(conv, ROOM['ignoreValueBelow'])
return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython())
+ elif conv == ROOM['buttonPress']:
+ loop = rx.scheduler.eventloop.TwistedScheduler(reactor)
+ return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop)
else:
raise NotImplementedError(conv)