Mercurial > code > home > repos > homeauto
view service/mqtt_to_rdf/button_events.py @ 1743:daf9deee42ca
deployment
author | drewp@bigasterisk.com |
---|---|
date | Thu, 09 Nov 2023 14:55:26 -0800 |
parents | 7d3797ed6681 |
children |
line wrap: on
line source
"""given repeated RF codes for a button press, make interesting button events""" import asyncio import time from typing import Any, Callable, List from rdflib import Namespace from reactivex import Observable from reactivex.abc.disposable import DisposableBase from reactivex.disposable import (CompositeDisposable, SerialDisposable, SingleAssignmentDisposable) from reactivex.scheduler.eventloop import AsyncIOScheduler ROOM = Namespace('http://projects.bigasterisk.com/room/') def button_events( min_hold_sec: float, release_after_sec: float, ) -> Callable[[Observable], Observable]: def op(source: Observable) -> Observable: def subscribe(observer, scheduler_=None) -> DisposableBase: _scheduler = AsyncIOScheduler(asyncio.get_event_loop()) cancelable = SerialDisposable() button_state = [None] press_time: List[float] = [0.] def set_state(state): button_state[0] = state observer.on_next(button_state[0]) set_state(ROOM['notPressed']) def on_next(x: Any) -> None: now = time.time() if button_state[0] == ROOM['notPressed']: set_state(ROOM['pressed']) press_time[0] = now elif button_state[0] == ROOM['pressed']: if now > press_time[0] + min_hold_sec: set_state(ROOM['held']) # should be pressed AND held elif button_state[0] == ROOM['held']: pass else: raise NotImplementedError(f'button_state={button_state}') d = SingleAssignmentDisposable() cancelable.disposable = d def action(scheduler, state=None) -> None: if button_state[0] != ROOM['notPressed']: set_state(ROOM['notPressed']) d.disposable = _scheduler.schedule_relative(release_after_sec, action) def on_error(exception: Exception) -> None: cancelable.dispose() observer.on_error(exception) def on_completed() -> None: raise NotImplementedError subscription = source.subscribe(on_next, on_error, on_completed, scheduler=scheduler_) return CompositeDisposable(subscription, cancelable) return Observable(subscribe) return op