diff service/mqtt_to_rdf/button_events.py @ 1726:7d3797ed6681

rough port to starlette and reactivex
author drewp@bigasterisk.com
date Tue, 20 Jun 2023 23:14:28 -0700
parents c3e3bd5dfa0b
children
line wrap: on
line diff
--- a/service/mqtt_to_rdf/button_events.py	Tue Jun 20 23:13:26 2023 -0700
+++ b/service/mqtt_to_rdf/button_events.py	Tue Jun 20 23:14:28 2023 -0700
@@ -1,23 +1,27 @@
 """given repeated RF codes for a button press, make interesting button events"""
 
+import asyncio
 import time
 from typing import Any, Callable, List
-from rx.core.typing import Disposable
-from rx.core import Observable, typing
-from rx.disposable import CompositeDisposable, SingleAssignmentDisposable, SerialDisposable
-from rx.scheduler import TimeoutScheduler
+
 from rdflib import Namespace
+from reactivex import Observable
+from reactivex.abc.disposable import DisposableBase
+from reactivex.disposable import (CompositeDisposable, SerialDisposable, SingleAssignmentDisposable)
+from reactivex.scheduler.eventloop import AsyncIOScheduler
+
 ROOM = Namespace('http://projects.bigasterisk.com/room/')
 
 
-def button_events(min_hold_sec: float,
-                  release_after_sec: float,
-                  scheduler=typing.Scheduler) -> Callable[[Observable], Observable]:
+def button_events(
+    min_hold_sec: float,
+    release_after_sec: float,
+) -> Callable[[Observable], Observable]:
 
     def op(source: Observable) -> Observable:
 
-        def subscribe(observer, scheduler_=None) -> Disposable:
-            _scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton()
+        def subscribe(observer, scheduler_=None) -> DisposableBase:
+            _scheduler = AsyncIOScheduler(asyncio.get_event_loop())
             cancelable = SerialDisposable()
             button_state = [None]
 
@@ -58,7 +62,7 @@
             def on_completed() -> None:
                 raise NotImplementedError
 
-            subscription = source.subscribe_(on_next, on_error, on_completed, scheduler=scheduler_)
+            subscription = source.subscribe(on_next, on_error, on_completed, scheduler=scheduler_)
             return CompositeDisposable(subscription, cancelable)
 
         return Observable(subscribe)