comparison doorbell_to_mqtt.py @ 2:f822e7fe7120

rewrite to use telemetrix (localized)
author drewp@bigasterisk.com
date Sun, 05 Feb 2023 14:06:19 -0800
parents 7bd85b962845
children
comparison
equal deleted inserted replaced
1:125c794511a6 2:f822e7fe7120
1 import time 1 import asyncio
2 import logging 2 import logging
3 import sys 3 import sys
4 import paho.mqtt.client as mqtt
5 from pyfirmata import Arduino, INPUT
6 4
7 PULLUP = 0x0b 5 import asyncio_mqtt
6 import uvicorn
7 from starlette.applications import Starlette
8 from starlette.requests import Request
9 from starlette.responses import HTMLResponse, RedirectResponse
10 from starlette.routing import Route
11 from typing import Optional
12 import telemetrix_local as telemetrix_aio
8 13
9 MIN_REPEAT_SEC = .5 14 DIGITAL_PIN = 2 # arduino pin number
15 KILL_TIME = 5 # sleep time to keep forever loop open
16 MQTT_RECONNECT_SEC = 5
17 MIN_REPEAT_SEC = 2
10 18
11 logging.basicConfig(level=logging.INFO) 19 logging.basicConfig(level=logging.DEBUG)
12 log = logging.getLogger() 20 log = logging.getLogger()
13 21
14 dev, = sys.argv[1:] 22 dev, = sys.argv[1:]
15 23
16 client = mqtt.Client()
17 client.will_set('doorbell/online', '0', qos=0, retain=False)
18 client.connect('bang')
19 24
20 log.info(f'connecting to {dev}') 25 async def digital_in_pullup(my_board, pin):
21 # note that I set StandardFirmata to double baud rate 26 last_report = 0
22 board = Arduino(dev, baudrate=115200)
23 log.info('done')
24 27
25 pin = board.get_pin('d:2:i') 28 async def cb(data):
26 pin.mode = PULLUP # config msg to send to firmata 29 nonlocal last_report
27 pin._mode = INPUT # fake value so pyfirmata still reads this pin 30 _pinmode, _pin, value, t = data
31 pressed = not value
32 if pressed:
33 if last_report < t - MIN_REPEAT_SEC:
34 log.info(f'press {t}')
35 await press()
36 last_report = t
28 37
29 log.info('running mqtt client') 38 await my_board.set_pin_mode_digital_input_pullup(pin, cb)
30 client.loop_start() 39 await board.enable_digital_reporting(2)
31 log.info('done') 40 while True:
32 41 await asyncio.sleep(KILL_TIME)
33 client.publish('doorbell/online', '1')
34
35 last_value = None
36 last_edge = 0
37 42
38 43
39 def on_press(t): 44 board = telemetrix_aio.TelemetrixAIO(com_port=dev, autostart=False)
40 global last_edge 45
41 if t > last_edge + MIN_REPEAT_SEC: 46 cli: Optional[asyncio_mqtt.Client] =None
42 log.info(f'press {t=}') 47
43 client.publish('doorbell/button', 'press') 48 async def press():
44 log.info('published') 49 await cli.publish('doorbell/button', 'pressed')
45 last_edge = t 50
51 async def status(request: Request) -> HTMLResponse:
52 return HTMLResponse("""<doctype HTML>
53 <html>
54 <head><title>doorbell</title></head>
55 <body>
56 <form method="POST" action="press">
57 <button type="submit">test ring</button>
58 </form>
59 </body>
60 </html>
61 """)
62
63 async def post_press(request: Request) -> RedirectResponse:
64 await press()
65 return RedirectResponse('.', status_code=303)
46 66
47 67
48 try: 68
49 while True: 69 app = Starlette(routes=[
50 now = time.time() 70 Route('/', status),
51 board.iterate() 71 Route('/press', post_press, methods=['POST']),
52 value = pin.read() 72 ])
53 if value == 0 and last_value == 1: 73
54 on_press(now) 74
55 last_value = value 75 async def main():
56 client.loop() 76 global cli
57 except Exception: 77 config = uvicorn.Config(app, host='0.0.0.0', port=8000, log_level="info")
58 client.publish('doorbell/online', '0') 78 server = uvicorn.Server(config)
59 client.disconnect() 79
60 raise 80 online = 'doorbell/online'
81 will = asyncio_mqtt.Will(topic=online, payload='0')
82 async with asyncio_mqtt.Client(
83 "bang",
84 keepalive=10, # don't go below firmata startup time of 5sec
85 will=will) as client:
86 log.debug(f'with {client=}')
87 cli=client
88 await board.start_aio()
89
90 t1 = asyncio.create_task(server.serve())
91 t2 = asyncio.create_task(digital_in_pullup(board, 2))
92 await client.publish(online, '1')
93 while True:
94 await asyncio.sleep(KILL_TIME)
95
96
97 asyncio.run(main(), debug=True)