Mercurial > code > home > repos > homeauto
diff service/mqtt_to_rdf/button_events.py @ 1726:7d3797ed6681
rough port to starlette and reactivex
author | drewp@bigasterisk.com |
---|---|
date | Tue, 20 Jun 2023 23:14:28 -0700 |
parents | c3e3bd5dfa0b |
children |
line wrap: on
line diff
--- a/service/mqtt_to_rdf/button_events.py Tue Jun 20 23:13:26 2023 -0700 +++ b/service/mqtt_to_rdf/button_events.py Tue Jun 20 23:14:28 2023 -0700 @@ -1,23 +1,27 @@ """given repeated RF codes for a button press, make interesting button events""" +import asyncio import time from typing import Any, Callable, List -from rx.core.typing import Disposable -from rx.core import Observable, typing -from rx.disposable import CompositeDisposable, SingleAssignmentDisposable, SerialDisposable -from rx.scheduler import TimeoutScheduler + from rdflib import Namespace +from reactivex import Observable +from reactivex.abc.disposable import DisposableBase +from reactivex.disposable import (CompositeDisposable, SerialDisposable, SingleAssignmentDisposable) +from reactivex.scheduler.eventloop import AsyncIOScheduler + ROOM = Namespace('http://projects.bigasterisk.com/room/') -def button_events(min_hold_sec: float, - release_after_sec: float, - scheduler=typing.Scheduler) -> Callable[[Observable], Observable]: +def button_events( + min_hold_sec: float, + release_after_sec: float, +) -> Callable[[Observable], Observable]: def op(source: Observable) -> Observable: - def subscribe(observer, scheduler_=None) -> Disposable: - _scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton() + def subscribe(observer, scheduler_=None) -> DisposableBase: + _scheduler = AsyncIOScheduler(asyncio.get_event_loop()) cancelable = SerialDisposable() button_state = [None] @@ -58,7 +62,7 @@ def on_completed() -> None: raise NotImplementedError - subscription = source.subscribe_(on_next, on_error, on_completed, scheduler=scheduler_) + subscription = source.subscribe(on_next, on_error, on_completed, scheduler=scheduler_) return CompositeDisposable(subscription, cancelable) return Observable(subscribe)