Mercurial > code > home > repos > homeauto
changeset 793:c3e3bd5dfa0b
add rf button mqtt message processing
author | drewp@bigasterisk.com |
---|---|
date | Mon, 30 Nov 2020 23:40:38 -0800 |
parents | 06583e0b5885 |
children | fafe86ae0b03 |
files | service/mqtt_to_rdf/button_events.py service/mqtt_to_rdf/config_rf.n3 service/mqtt_to_rdf/mqtt_to_rdf.py |
diffstat | 3 files changed, 116 insertions(+), 1 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_to_rdf/button_events.py Mon Nov 30 23:40:38 2020 -0800 @@ -0,0 +1,66 @@ +"""given repeated RF codes for a button press, make interesting button events""" + +import time +from typing import Any, Callable, List +from rx.core.typing import Disposable +from rx.core import Observable, typing +from rx.disposable import CompositeDisposable, SingleAssignmentDisposable, SerialDisposable +from rx.scheduler import TimeoutScheduler +from rdflib import Namespace +ROOM = Namespace('http://projects.bigasterisk.com/room/') + + +def button_events(min_hold_sec: float, + release_after_sec: float, + scheduler=typing.Scheduler) -> Callable[[Observable], Observable]: + + def op(source: Observable) -> Observable: + + def subscribe(observer, scheduler_=None) -> Disposable: + _scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton() + cancelable = SerialDisposable() + button_state = [None] + + press_time: List[float] = [0.] + + def set_state(state): + button_state[0] = state + observer.on_next(button_state[0]) + + set_state(ROOM['notPressed']) + + def on_next(x: Any) -> None: + now = time.time() + if button_state[0] == ROOM['notPressed']: + set_state(ROOM['pressed']) + press_time[0] = now + elif button_state[0] == ROOM['pressed']: + if now > press_time[0] + min_hold_sec: + set_state(ROOM['held']) # should be pressed AND held + elif button_state[0] == ROOM['held']: + pass + else: + raise NotImplementedError(f'button_state={button_state}') + + d = SingleAssignmentDisposable() + cancelable.disposable = d + + def action(scheduler, state=None) -> None: + if button_state[0] != ROOM['notPressed']: + set_state(ROOM['notPressed']) + + d.disposable = _scheduler.schedule_relative(release_after_sec, action) + + def on_error(exception: Exception) -> None: + cancelable.dispose() + observer.on_error(exception) + + def on_completed() -> None: + raise NotImplementedError + + subscription = source.subscribe_(on_next, on_error, on_completed, scheduler=scheduler_) + return CompositeDisposable(subscription, cancelable) + + return Observable(subscribe) + + return op
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/service/mqtt_to_rdf/config_rf.n3 Mon Nov 30 23:40:38 2020 -0800 @@ -0,0 +1,24 @@ +@prefix : <http://projects.bigasterisk.com/room/> . +@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> . +@prefix fr: <http://bigasterisk.com/foaf/> . + +:wallButton1 a :MqttStatementSource; + :mqttTopic ("rfsetup" "rf_received"); + :filterPayloadJsonEquals "{\"protocol\":1,\"code0\":0,\"code1\":7906}"; + :parser :rfCode; + :conversions (:buttonPress); + :graphStatements [:outputPredicate :state] . + +:wallButton2 a :MqttStatementSource; + :mqttTopic ("rfsetup" "rf_received"); + :filterPayloadJsonEquals "{\"protocol\":1,\"code0\":0,\"code1\":7905}"; + :parser :rfCode; + :conversions (:buttonPress); + :graphStatements [:outputPredicate :state] . + +:wallButton3 a :MqttStatementSource; + :mqttTopic ("rfsetup" "rf_received"); + :filterPayloadJsonEquals "{\"protocol\":1,\"code0\":0,\"code1\":7908}"; + :parser :rfCode; + :conversions (:buttonPress); + :graphStatements [:outputPredicate :state] .
--- a/service/mqtt_to_rdf/mqtt_to_rdf.py Mon Nov 30 23:41:31 2020 -0800 +++ b/service/mqtt_to_rdf/mqtt_to_rdf.py Mon Nov 30 23:40:38 2020 -0800 @@ -3,6 +3,7 @@ """ import json from pathlib import Path +from typing import Callable, cast import cyclone.web from docopt import docopt @@ -26,6 +27,8 @@ from standardservice.scalessetup import gatherProcessStats from twisted.internet import reactor +from button_events import button_events + ROOM = Namespace('http://projects.bigasterisk.com/room/') gatherProcessStats() @@ -55,6 +58,7 @@ self._mqttStats = scales.collection(statPath + '/incoming', scales.IntStat('count'), scales.RecentFpsStat('fps')) rawBytes = self.subscribeMqtt(self.mqttTopic) + rawBytes = self.addFilters(rawBytes) rawBytes = rx.operators.do_action(self.countIncomingMessage)(rawBytes) parsed = self.getParser()(rawBytes) @@ -67,6 +71,18 @@ outputQuadsSets.subscribe_(self.updateQuads) + def addFilters(self, rawBytes): + jsonEq = self.config.value(self.uri, ROOM['filterPayloadJsonEquals']) + if jsonEq: + required = json.loads(jsonEq.toPython()) + + def eq(jsonBytes): + msg = json.loads(jsonBytes.decode('ascii')) + return msg == required + + rawBytes = rx.operators.filter(eq)(rawBytes) + return rawBytes + def topicFromConfig(self, config) -> bytes: topicParts = list(config.items(config.value(self.uri, ROOM['mqttTopic']))) return b'/'.join(t.encode('ascii') for t in topicParts) @@ -94,6 +110,8 @@ return rx.operators.map(self.parseJsonBrightness) elif ROOM['ValueMap'] in g.objects(parser, RDF.type): return rx.operators.map(lambda v: self.remap(parser, v.decode('ascii'))) + elif parser == ROOM['rfCode']: + return rx.operators.map(self.parseJsonRfCode) else: raise NotImplementedError(parser) @@ -101,7 +119,11 @@ msg = json.loads(mqttValue.decode('ascii')) return Literal(float(msg['brightness'] / 255) if msg['state'] == 'ON' else 0.0) - def conversionStep(self, conv: Node): + def parseJsonRfCode(self, mqttValue: bytes): + msg = json.loads(mqttValue.decode('ascii')) + return Literal('%08x%08x' % (msg['code0'], msg['code1'])) + + def conversionStep(self, conv: Node) -> Callable[[Observable], Observable]: g = self.config if conv == ROOM['celsiusToFarenheit']: @@ -112,6 +134,9 @@ elif g.value(conv, ROOM['ignoreValueBelow'], default=None) is not None: threshold = g.value(conv, ROOM['ignoreValueBelow']) return rx.operators.filter(lambda value: value.toPython() >= threshold.toPython()) + elif conv == ROOM['buttonPress']: + loop = rx.scheduler.eventloop.TwistedScheduler(reactor) + return button_events(min_hold_sec=1.0, release_after_sec=1.0, scheduler=loop) else: raise NotImplementedError(conv)