view service/mqtt_to_rdf/button_events.py @ 1743:daf9deee42ca

deployment
author drewp@bigasterisk.com
date Thu, 09 Nov 2023 14:55:26 -0800
parents 7d3797ed6681
children
line wrap: on
line source

"""given repeated RF codes for a button press, make interesting button events"""

import asyncio
import time
from typing import Any, Callable, List

from rdflib import Namespace
from reactivex import Observable
from reactivex.abc.disposable import DisposableBase
from reactivex.disposable import (CompositeDisposable, SerialDisposable, SingleAssignmentDisposable)
from reactivex.scheduler.eventloop import AsyncIOScheduler

ROOM = Namespace('http://projects.bigasterisk.com/room/')


def button_events(
    min_hold_sec: float,
    release_after_sec: float,
) -> Callable[[Observable], Observable]:

    def op(source: Observable) -> Observable:

        def subscribe(observer, scheduler_=None) -> DisposableBase:
            _scheduler = AsyncIOScheduler(asyncio.get_event_loop())
            cancelable = SerialDisposable()
            button_state = [None]

            press_time: List[float] = [0.]

            def set_state(state):
                button_state[0] = state
                observer.on_next(button_state[0])

            set_state(ROOM['notPressed'])

            def on_next(x: Any) -> None:
                now = time.time()
                if button_state[0] == ROOM['notPressed']:
                    set_state(ROOM['pressed'])
                    press_time[0] = now
                elif button_state[0] == ROOM['pressed']:
                    if now > press_time[0] + min_hold_sec:
                        set_state(ROOM['held'])  # should be pressed AND held
                elif button_state[0] == ROOM['held']:
                    pass
                else:
                    raise NotImplementedError(f'button_state={button_state}')

                d = SingleAssignmentDisposable()
                cancelable.disposable = d

                def action(scheduler, state=None) -> None:
                    if button_state[0] != ROOM['notPressed']:
                        set_state(ROOM['notPressed'])

                d.disposable = _scheduler.schedule_relative(release_after_sec, action)

            def on_error(exception: Exception) -> None:
                cancelable.dispose()
                observer.on_error(exception)

            def on_completed() -> None:
                raise NotImplementedError

            subscription = source.subscribe(on_next, on_error, on_completed, scheduler=scheduler_)
            return CompositeDisposable(subscription, cancelable)

        return Observable(subscribe)

    return op