0
|
1 import time
|
|
2 import logging
|
|
3 import sys
|
|
4 import paho.mqtt.client as mqtt
|
|
5 from pyfirmata import Arduino, INPUT
|
|
6
|
|
7 PULLUP = 0x0b
|
|
8
|
|
9 MIN_REPEAT_SEC = .5
|
|
10
|
|
11 logging.basicConfig(level=logging.INFO)
|
|
12 log = logging.getLogger()
|
|
13
|
|
14 dev, = sys.argv[1:]
|
|
15
|
|
16 client = mqtt.Client()
|
|
17 client.will_set('doorbell/online', '0', qos=0, retain=False)
|
|
18 client.connect('bang')
|
|
19
|
|
20 log.info(f'connecting to {dev}')
|
|
21 # note that I set StandardFirmata to double baud rate
|
|
22 board = Arduino(dev, baudrate=115200)
|
|
23 log.info('done')
|
|
24
|
|
25 pin = board.get_pin('d:2:i')
|
|
26 pin.mode = PULLUP # config msg to send to firmata
|
|
27 pin._mode = INPUT # fake value so pyfirmata still reads this pin
|
|
28
|
|
29 log.info('running mqtt client')
|
|
30 client.loop_start()
|
|
31 log.info('done')
|
|
32
|
|
33 client.publish('doorbell/online', '1')
|
|
34
|
|
35 last_value = None
|
|
36 last_edge = 0
|
|
37
|
|
38
|
|
39 def on_press(t):
|
|
40 global last_edge
|
|
41 if t > last_edge + MIN_REPEAT_SEC:
|
|
42 log.info(f'press {t=}')
|
|
43 client.publish('doorbell/button', 'press')
|
|
44 log.info('published')
|
|
45 last_edge = t
|
|
46
|
|
47
|
|
48 try:
|
|
49 while True:
|
|
50 now = time.time()
|
|
51 board.iterate()
|
|
52 value = pin.read()
|
|
53 if value == 0 and last_value == 1:
|
|
54 on_press(now)
|
|
55 last_value = value
|
|
56 client.loop()
|
|
57 except Exception:
|
|
58 client.publish('doorbell/online', '0')
|
|
59 client.disconnect()
|
|
60 raise |