view service/mqtt_to_rdf/button_events.py @ 1726:7d3797ed6681

rough port to starlette and reactivex
author drewp@bigasterisk.com
date Tue, 20 Jun 2023 23:14:28 -0700
parents c3e3bd5dfa0b
children
line wrap: on
line source

"""given repeated RF codes for a button press, make interesting button events"""

import asyncio
import time
from typing import Any, Callable, List

from rdflib import Namespace
from reactivex import Observable
from reactivex.abc.disposable import DisposableBase
from reactivex.disposable import (CompositeDisposable, SerialDisposable, SingleAssignmentDisposable)
from reactivex.scheduler.eventloop import AsyncIOScheduler

ROOM = Namespace('http://projects.bigasterisk.com/room/')


def button_events(
    min_hold_sec: float,
    release_after_sec: float,
) -> Callable[[Observable], Observable]:

    def op(source: Observable) -> Observable:

        def subscribe(observer, scheduler_=None) -> DisposableBase:
            _scheduler = AsyncIOScheduler(asyncio.get_event_loop())
            cancelable = SerialDisposable()
            button_state = [None]

            press_time: List[float] = [0.]

            def set_state(state):
                button_state[0] = state
                observer.on_next(button_state[0])

            set_state(ROOM['notPressed'])

            def on_next(x: Any) -> None:
                now = time.time()
                if button_state[0] == ROOM['notPressed']:
                    set_state(ROOM['pressed'])
                    press_time[0] = now
                elif button_state[0] == ROOM['pressed']:
                    if now > press_time[0] + min_hold_sec:
                        set_state(ROOM['held'])  # should be pressed AND held
                elif button_state[0] == ROOM['held']:
                    pass
                else:
                    raise NotImplementedError(f'button_state={button_state}')

                d = SingleAssignmentDisposable()
                cancelable.disposable = d

                def action(scheduler, state=None) -> None:
                    if button_state[0] != ROOM['notPressed']:
                        set_state(ROOM['notPressed'])

                d.disposable = _scheduler.schedule_relative(release_after_sec, action)

            def on_error(exception: Exception) -> None:
                cancelable.dispose()
                observer.on_error(exception)

            def on_completed() -> None:
                raise NotImplementedError

            subscription = source.subscribe(on_next, on_error, on_completed, scheduler=scheduler_)
            return CompositeDisposable(subscription, cancelable)

        return Observable(subscribe)

    return op