comparison service/mqtt_to_rdf/button_events.py @ 793:c3e3bd5dfa0b

add rf button mqtt message processing
author drewp@bigasterisk.com
date Mon, 30 Nov 2020 23:40:38 -0800
parents
children 7d3797ed6681
comparison
equal deleted inserted replaced
792:06583e0b5885 793:c3e3bd5dfa0b
1 """given repeated RF codes for a button press, make interesting button events"""
2
3 import time
4 from typing import Any, Callable, List
5 from rx.core.typing import Disposable
6 from rx.core import Observable, typing
7 from rx.disposable import CompositeDisposable, SingleAssignmentDisposable, SerialDisposable
8 from rx.scheduler import TimeoutScheduler
9 from rdflib import Namespace
10 ROOM = Namespace('http://projects.bigasterisk.com/room/')
11
12
13 def button_events(min_hold_sec: float,
14 release_after_sec: float,
15 scheduler=typing.Scheduler) -> Callable[[Observable], Observable]:
16
17 def op(source: Observable) -> Observable:
18
19 def subscribe(observer, scheduler_=None) -> Disposable:
20 _scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton()
21 cancelable = SerialDisposable()
22 button_state = [None]
23
24 press_time: List[float] = [0.]
25
26 def set_state(state):
27 button_state[0] = state
28 observer.on_next(button_state[0])
29
30 set_state(ROOM['notPressed'])
31
32 def on_next(x: Any) -> None:
33 now = time.time()
34 if button_state[0] == ROOM['notPressed']:
35 set_state(ROOM['pressed'])
36 press_time[0] = now
37 elif button_state[0] == ROOM['pressed']:
38 if now > press_time[0] + min_hold_sec:
39 set_state(ROOM['held']) # should be pressed AND held
40 elif button_state[0] == ROOM['held']:
41 pass
42 else:
43 raise NotImplementedError(f'button_state={button_state}')
44
45 d = SingleAssignmentDisposable()
46 cancelable.disposable = d
47
48 def action(scheduler, state=None) -> None:
49 if button_state[0] != ROOM['notPressed']:
50 set_state(ROOM['notPressed'])
51
52 d.disposable = _scheduler.schedule_relative(release_after_sec, action)
53
54 def on_error(exception: Exception) -> None:
55 cancelable.dispose()
56 observer.on_error(exception)
57
58 def on_completed() -> None:
59 raise NotImplementedError
60
61 subscription = source.subscribe_(on_next, on_error, on_completed, scheduler=scheduler_)
62 return CompositeDisposable(subscription, cancelable)
63
64 return Observable(subscribe)
65
66 return op