Mercurial > code > home > repos > homeauto
annotate service/mqtt_to_rdf/button_events.py @ 1726:7d3797ed6681
rough port to starlette and reactivex
author | drewp@bigasterisk.com |
---|---|
date | Tue, 20 Jun 2023 23:14:28 -0700 |
parents | c3e3bd5dfa0b |
children |
rev | line source |
---|---|
793 | 1 """given repeated RF codes for a button press, make interesting button events""" |
2 | |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
793
diff
changeset
|
3 import asyncio |
793 | 4 import time |
5 from typing import Any, Callable, List | |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
793
diff
changeset
|
6 |
793 | 7 from rdflib import Namespace |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
793
diff
changeset
|
8 from reactivex import Observable |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
793
diff
changeset
|
9 from reactivex.abc.disposable import DisposableBase |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
793
diff
changeset
|
10 from reactivex.disposable import (CompositeDisposable, SerialDisposable, SingleAssignmentDisposable) |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
793
diff
changeset
|
11 from reactivex.scheduler.eventloop import AsyncIOScheduler |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
793
diff
changeset
|
12 |
793 | 13 ROOM = Namespace('http://projects.bigasterisk.com/room/') |
14 | |
15 | |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
793
diff
changeset
|
16 def button_events( |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
793
diff
changeset
|
17 min_hold_sec: float, |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
793
diff
changeset
|
18 release_after_sec: float, |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
793
diff
changeset
|
19 ) -> Callable[[Observable], Observable]: |
793 | 20 |
21 def op(source: Observable) -> Observable: | |
22 | |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
793
diff
changeset
|
23 def subscribe(observer, scheduler_=None) -> DisposableBase: |
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
793
diff
changeset
|
24 _scheduler = AsyncIOScheduler(asyncio.get_event_loop()) |
793 | 25 cancelable = SerialDisposable() |
26 button_state = [None] | |
27 | |
28 press_time: List[float] = [0.] | |
29 | |
30 def set_state(state): | |
31 button_state[0] = state | |
32 observer.on_next(button_state[0]) | |
33 | |
34 set_state(ROOM['notPressed']) | |
35 | |
36 def on_next(x: Any) -> None: | |
37 now = time.time() | |
38 if button_state[0] == ROOM['notPressed']: | |
39 set_state(ROOM['pressed']) | |
40 press_time[0] = now | |
41 elif button_state[0] == ROOM['pressed']: | |
42 if now > press_time[0] + min_hold_sec: | |
43 set_state(ROOM['held']) # should be pressed AND held | |
44 elif button_state[0] == ROOM['held']: | |
45 pass | |
46 else: | |
47 raise NotImplementedError(f'button_state={button_state}') | |
48 | |
49 d = SingleAssignmentDisposable() | |
50 cancelable.disposable = d | |
51 | |
52 def action(scheduler, state=None) -> None: | |
53 if button_state[0] != ROOM['notPressed']: | |
54 set_state(ROOM['notPressed']) | |
55 | |
56 d.disposable = _scheduler.schedule_relative(release_after_sec, action) | |
57 | |
58 def on_error(exception: Exception) -> None: | |
59 cancelable.dispose() | |
60 observer.on_error(exception) | |
61 | |
62 def on_completed() -> None: | |
63 raise NotImplementedError | |
64 | |
1726
7d3797ed6681
rough port to starlette and reactivex
drewp@bigasterisk.com
parents:
793
diff
changeset
|
65 subscription = source.subscribe(on_next, on_error, on_completed, scheduler=scheduler_) |
793 | 66 return CompositeDisposable(subscription, cancelable) |
67 | |
68 return Observable(subscribe) | |
69 | |
70 return op |