Mercurial > code > home > repos > doorbell
comparison doorbell_to_mqtt.py @ 2:f822e7fe7120
rewrite to use telemetrix (localized)
author | drewp@bigasterisk.com |
---|---|
date | Sun, 05 Feb 2023 14:06:19 -0800 |
parents | 7bd85b962845 |
children |
comparison
equal
deleted
inserted
replaced
1:125c794511a6 | 2:f822e7fe7120 |
---|---|
1 import time | 1 import asyncio |
2 import logging | 2 import logging |
3 import sys | 3 import sys |
4 import paho.mqtt.client as mqtt | |
5 from pyfirmata import Arduino, INPUT | |
6 | 4 |
7 PULLUP = 0x0b | 5 import asyncio_mqtt |
6 import uvicorn | |
7 from starlette.applications import Starlette | |
8 from starlette.requests import Request | |
9 from starlette.responses import HTMLResponse, RedirectResponse | |
10 from starlette.routing import Route | |
11 from typing import Optional | |
12 import telemetrix_local as telemetrix_aio | |
8 | 13 |
9 MIN_REPEAT_SEC = .5 | 14 DIGITAL_PIN = 2 # arduino pin number |
15 KILL_TIME = 5 # sleep time to keep forever loop open | |
16 MQTT_RECONNECT_SEC = 5 | |
17 MIN_REPEAT_SEC = 2 | |
10 | 18 |
11 logging.basicConfig(level=logging.INFO) | 19 logging.basicConfig(level=logging.DEBUG) |
12 log = logging.getLogger() | 20 log = logging.getLogger() |
13 | 21 |
14 dev, = sys.argv[1:] | 22 dev, = sys.argv[1:] |
15 | 23 |
16 client = mqtt.Client() | |
17 client.will_set('doorbell/online', '0', qos=0, retain=False) | |
18 client.connect('bang') | |
19 | 24 |
20 log.info(f'connecting to {dev}') | 25 async def digital_in_pullup(my_board, pin): |
21 # note that I set StandardFirmata to double baud rate | 26 last_report = 0 |
22 board = Arduino(dev, baudrate=115200) | |
23 log.info('done') | |
24 | 27 |
25 pin = board.get_pin('d:2:i') | 28 async def cb(data): |
26 pin.mode = PULLUP # config msg to send to firmata | 29 nonlocal last_report |
27 pin._mode = INPUT # fake value so pyfirmata still reads this pin | 30 _pinmode, _pin, value, t = data |
31 pressed = not value | |
32 if pressed: | |
33 if last_report < t - MIN_REPEAT_SEC: | |
34 log.info(f'press {t}') | |
35 await press() | |
36 last_report = t | |
28 | 37 |
29 log.info('running mqtt client') | 38 await my_board.set_pin_mode_digital_input_pullup(pin, cb) |
30 client.loop_start() | 39 await board.enable_digital_reporting(2) |
31 log.info('done') | 40 while True: |
32 | 41 await asyncio.sleep(KILL_TIME) |
33 client.publish('doorbell/online', '1') | |
34 | |
35 last_value = None | |
36 last_edge = 0 | |
37 | 42 |
38 | 43 |
39 def on_press(t): | 44 board = telemetrix_aio.TelemetrixAIO(com_port=dev, autostart=False) |
40 global last_edge | 45 |
41 if t > last_edge + MIN_REPEAT_SEC: | 46 cli: Optional[asyncio_mqtt.Client] =None |
42 log.info(f'press {t=}') | 47 |
43 client.publish('doorbell/button', 'press') | 48 async def press(): |
44 log.info('published') | 49 await cli.publish('doorbell/button', 'pressed') |
45 last_edge = t | 50 |
51 async def status(request: Request) -> HTMLResponse: | |
52 return HTMLResponse("""<doctype HTML> | |
53 <html> | |
54 <head><title>doorbell</title></head> | |
55 <body> | |
56 <form method="POST" action="press"> | |
57 <button type="submit">test ring</button> | |
58 </form> | |
59 </body> | |
60 </html> | |
61 """) | |
62 | |
63 async def post_press(request: Request) -> RedirectResponse: | |
64 await press() | |
65 return RedirectResponse('.', status_code=303) | |
46 | 66 |
47 | 67 |
48 try: | 68 |
49 while True: | 69 app = Starlette(routes=[ |
50 now = time.time() | 70 Route('/', status), |
51 board.iterate() | 71 Route('/press', post_press, methods=['POST']), |
52 value = pin.read() | 72 ]) |
53 if value == 0 and last_value == 1: | 73 |
54 on_press(now) | 74 |
55 last_value = value | 75 async def main(): |
56 client.loop() | 76 global cli |
57 except Exception: | 77 config = uvicorn.Config(app, host='0.0.0.0', port=8000, log_level="info") |
58 client.publish('doorbell/online', '0') | 78 server = uvicorn.Server(config) |
59 client.disconnect() | 79 |
60 raise | 80 online = 'doorbell/online' |
81 will = asyncio_mqtt.Will(topic=online, payload='0') | |
82 async with asyncio_mqtt.Client( | |
83 "bang", | |
84 keepalive=10, # don't go below firmata startup time of 5sec | |
85 will=will) as client: | |
86 log.debug(f'with {client=}') | |
87 cli=client | |
88 await board.start_aio() | |
89 | |
90 t1 = asyncio.create_task(server.serve()) | |
91 t2 = asyncio.create_task(digital_in_pullup(board, 2)) | |
92 await client.publish(online, '1') | |
93 while True: | |
94 await asyncio.sleep(KILL_TIME) | |
95 | |
96 | |
97 asyncio.run(main(), debug=True) |