Mercurial > code > home > repos > doorbell
diff doorbell_to_mqtt.py @ 2:f822e7fe7120
rewrite to use telemetrix (localized)
author | drewp@bigasterisk.com |
---|---|
date | Sun, 05 Feb 2023 14:06:19 -0800 |
parents | 7bd85b962845 |
children |
line wrap: on
line diff
--- a/doorbell_to_mqtt.py Sun Feb 05 14:05:06 2023 -0800 +++ b/doorbell_to_mqtt.py Sun Feb 05 14:06:19 2023 -0800 @@ -1,60 +1,97 @@ -import time +import asyncio import logging import sys -import paho.mqtt.client as mqtt -from pyfirmata import Arduino, INPUT -PULLUP = 0x0b +import asyncio_mqtt +import uvicorn +from starlette.applications import Starlette +from starlette.requests import Request +from starlette.responses import HTMLResponse, RedirectResponse +from starlette.routing import Route +from typing import Optional +import telemetrix_local as telemetrix_aio -MIN_REPEAT_SEC = .5 +DIGITAL_PIN = 2 # arduino pin number +KILL_TIME = 5 # sleep time to keep forever loop open +MQTT_RECONNECT_SEC = 5 +MIN_REPEAT_SEC = 2 -logging.basicConfig(level=logging.INFO) +logging.basicConfig(level=logging.DEBUG) log = logging.getLogger() dev, = sys.argv[1:] -client = mqtt.Client() -client.will_set('doorbell/online', '0', qos=0, retain=False) -client.connect('bang') -log.info(f'connecting to {dev}') -# note that I set StandardFirmata to double baud rate -board = Arduino(dev, baudrate=115200) -log.info('done') +async def digital_in_pullup(my_board, pin): + last_report = 0 -pin = board.get_pin('d:2:i') -pin.mode = PULLUP # config msg to send to firmata -pin._mode = INPUT # fake value so pyfirmata still reads this pin + async def cb(data): + nonlocal last_report + _pinmode, _pin, value, t = data + pressed = not value + if pressed: + if last_report < t - MIN_REPEAT_SEC: + log.info(f'press {t}') + await press() + last_report = t -log.info('running mqtt client') -client.loop_start() -log.info('done') - -client.publish('doorbell/online', '1') - -last_value = None -last_edge = 0 + await my_board.set_pin_mode_digital_input_pullup(pin, cb) + await board.enable_digital_reporting(2) + while True: + await asyncio.sleep(KILL_TIME) -def on_press(t): - global last_edge - if t > last_edge + MIN_REPEAT_SEC: - log.info(f'press {t=}') - client.publish('doorbell/button', 'press') - log.info('published') - last_edge = t +board = telemetrix_aio.TelemetrixAIO(com_port=dev, autostart=False) + +cli: Optional[asyncio_mqtt.Client] =None + +async def press(): + await cli.publish('doorbell/button', 'pressed') + +async def status(request: Request) -> HTMLResponse: + return HTMLResponse("""<doctype HTML> + <html> + <head><title>doorbell</title></head> + <body> + <form method="POST" action="press"> + <button type="submit">test ring</button> + </form> + </body> + </html> + """) + +async def post_press(request: Request) -> RedirectResponse: + await press() + return RedirectResponse('.', status_code=303) + -try: - while True: - now = time.time() - board.iterate() - value = pin.read() - if value == 0 and last_value == 1: - on_press(now) - last_value = value - client.loop() -except Exception: - client.publish('doorbell/online', '0') - client.disconnect() - raise \ No newline at end of file +app = Starlette(routes=[ + Route('/', status), + Route('/press', post_press, methods=['POST']), +]) + + +async def main(): + global cli + config = uvicorn.Config(app, host='0.0.0.0', port=8000, log_level="info") + server = uvicorn.Server(config) + + online = 'doorbell/online' + will = asyncio_mqtt.Will(topic=online, payload='0') + async with asyncio_mqtt.Client( + "bang", + keepalive=10, # don't go below firmata startup time of 5sec + will=will) as client: + log.debug(f'with {client=}') + cli=client + await board.start_aio() + + t1 = asyncio.create_task(server.serve()) + t2 = asyncio.create_task(digital_in_pullup(board, 2)) + await client.publish(online, '1') + while True: + await asyncio.sleep(KILL_TIME) + + +asyncio.run(main(), debug=True) \ No newline at end of file