793
|
1 """given repeated RF codes for a button press, make interesting button events"""
|
|
2
|
|
3 import time
|
|
4 from typing import Any, Callable, List
|
|
5 from rx.core.typing import Disposable
|
|
6 from rx.core import Observable, typing
|
|
7 from rx.disposable import CompositeDisposable, SingleAssignmentDisposable, SerialDisposable
|
|
8 from rx.scheduler import TimeoutScheduler
|
|
9 from rdflib import Namespace
|
|
10 ROOM = Namespace('http://projects.bigasterisk.com/room/')
|
|
11
|
|
12
|
|
13 def button_events(min_hold_sec: float,
|
|
14 release_after_sec: float,
|
|
15 scheduler=typing.Scheduler) -> Callable[[Observable], Observable]:
|
|
16
|
|
17 def op(source: Observable) -> Observable:
|
|
18
|
|
19 def subscribe(observer, scheduler_=None) -> Disposable:
|
|
20 _scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton()
|
|
21 cancelable = SerialDisposable()
|
|
22 button_state = [None]
|
|
23
|
|
24 press_time: List[float] = [0.]
|
|
25
|
|
26 def set_state(state):
|
|
27 button_state[0] = state
|
|
28 observer.on_next(button_state[0])
|
|
29
|
|
30 set_state(ROOM['notPressed'])
|
|
31
|
|
32 def on_next(x: Any) -> None:
|
|
33 now = time.time()
|
|
34 if button_state[0] == ROOM['notPressed']:
|
|
35 set_state(ROOM['pressed'])
|
|
36 press_time[0] = now
|
|
37 elif button_state[0] == ROOM['pressed']:
|
|
38 if now > press_time[0] + min_hold_sec:
|
|
39 set_state(ROOM['held']) # should be pressed AND held
|
|
40 elif button_state[0] == ROOM['held']:
|
|
41 pass
|
|
42 else:
|
|
43 raise NotImplementedError(f'button_state={button_state}')
|
|
44
|
|
45 d = SingleAssignmentDisposable()
|
|
46 cancelable.disposable = d
|
|
47
|
|
48 def action(scheduler, state=None) -> None:
|
|
49 if button_state[0] != ROOM['notPressed']:
|
|
50 set_state(ROOM['notPressed'])
|
|
51
|
|
52 d.disposable = _scheduler.schedule_relative(release_after_sec, action)
|
|
53
|
|
54 def on_error(exception: Exception) -> None:
|
|
55 cancelable.dispose()
|
|
56 observer.on_error(exception)
|
|
57
|
|
58 def on_completed() -> None:
|
|
59 raise NotImplementedError
|
|
60
|
|
61 subscription = source.subscribe_(on_next, on_error, on_completed, scheduler=scheduler_)
|
|
62 return CompositeDisposable(subscription, cancelable)
|
|
63
|
|
64 return Observable(subscribe)
|
|
65
|
|
66 return op
|