Mercurial > code > home > repos > homeauto
comparison service/mqtt_to_rdf/button_events.py @ 793:c3e3bd5dfa0b
add rf button mqtt message processing
author | drewp@bigasterisk.com |
---|---|
date | Mon, 30 Nov 2020 23:40:38 -0800 |
parents | |
children | 7d3797ed6681 |
comparison
equal
deleted
inserted
replaced
792:06583e0b5885 | 793:c3e3bd5dfa0b |
---|---|
1 """given repeated RF codes for a button press, make interesting button events""" | |
2 | |
3 import time | |
4 from typing import Any, Callable, List | |
5 from rx.core.typing import Disposable | |
6 from rx.core import Observable, typing | |
7 from rx.disposable import CompositeDisposable, SingleAssignmentDisposable, SerialDisposable | |
8 from rx.scheduler import TimeoutScheduler | |
9 from rdflib import Namespace | |
10 ROOM = Namespace('http://projects.bigasterisk.com/room/') | |
11 | |
12 | |
13 def button_events(min_hold_sec: float, | |
14 release_after_sec: float, | |
15 scheduler=typing.Scheduler) -> Callable[[Observable], Observable]: | |
16 | |
17 def op(source: Observable) -> Observable: | |
18 | |
19 def subscribe(observer, scheduler_=None) -> Disposable: | |
20 _scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton() | |
21 cancelable = SerialDisposable() | |
22 button_state = [None] | |
23 | |
24 press_time: List[float] = [0.] | |
25 | |
26 def set_state(state): | |
27 button_state[0] = state | |
28 observer.on_next(button_state[0]) | |
29 | |
30 set_state(ROOM['notPressed']) | |
31 | |
32 def on_next(x: Any) -> None: | |
33 now = time.time() | |
34 if button_state[0] == ROOM['notPressed']: | |
35 set_state(ROOM['pressed']) | |
36 press_time[0] = now | |
37 elif button_state[0] == ROOM['pressed']: | |
38 if now > press_time[0] + min_hold_sec: | |
39 set_state(ROOM['held']) # should be pressed AND held | |
40 elif button_state[0] == ROOM['held']: | |
41 pass | |
42 else: | |
43 raise NotImplementedError(f'button_state={button_state}') | |
44 | |
45 d = SingleAssignmentDisposable() | |
46 cancelable.disposable = d | |
47 | |
48 def action(scheduler, state=None) -> None: | |
49 if button_state[0] != ROOM['notPressed']: | |
50 set_state(ROOM['notPressed']) | |
51 | |
52 d.disposable = _scheduler.schedule_relative(release_after_sec, action) | |
53 | |
54 def on_error(exception: Exception) -> None: | |
55 cancelable.dispose() | |
56 observer.on_error(exception) | |
57 | |
58 def on_completed() -> None: | |
59 raise NotImplementedError | |
60 | |
61 subscription = source.subscribe_(on_next, on_error, on_completed, scheduler=scheduler_) | |
62 return CompositeDisposable(subscription, cancelable) | |
63 | |
64 return Observable(subscribe) | |
65 | |
66 return op |