Mercurial > code > home > repos > doorbell
view doorbell_to_mqtt.py @ 5:696a46a1b239
logging and minor refactors
author | drewp@bigasterisk.com |
---|---|
date | Sun, 05 Feb 2023 14:26:04 -0800 |
parents | f822e7fe7120 |
children |
line wrap: on
line source
import asyncio import logging import sys import asyncio_mqtt import uvicorn from starlette.applications import Starlette from starlette.requests import Request from starlette.responses import HTMLResponse, RedirectResponse from starlette.routing import Route from typing import Optional import telemetrix_local as telemetrix_aio DIGITAL_PIN = 2 # arduino pin number KILL_TIME = 5 # sleep time to keep forever loop open MQTT_RECONNECT_SEC = 5 MIN_REPEAT_SEC = 2 logging.basicConfig(level=logging.DEBUG) log = logging.getLogger() dev, = sys.argv[1:] async def digital_in_pullup(my_board, pin): last_report = 0 async def cb(data): nonlocal last_report _pinmode, _pin, value, t = data pressed = not value if pressed: if last_report < t - MIN_REPEAT_SEC: log.info(f'press {t}') await press() last_report = t await my_board.set_pin_mode_digital_input_pullup(pin, cb) await board.enable_digital_reporting(2) while True: await asyncio.sleep(KILL_TIME) board = telemetrix_aio.TelemetrixAIO(com_port=dev, autostart=False) cli: Optional[asyncio_mqtt.Client] =None async def press(): await cli.publish('doorbell/button', 'pressed') async def status(request: Request) -> HTMLResponse: return HTMLResponse("""<doctype HTML> <html> <head><title>doorbell</title></head> <body> <form method="POST" action="press"> <button type="submit">test ring</button> </form> </body> </html> """) async def post_press(request: Request) -> RedirectResponse: await press() return RedirectResponse('.', status_code=303) app = Starlette(routes=[ Route('/', status), Route('/press', post_press, methods=['POST']), ]) async def main(): global cli config = uvicorn.Config(app, host='0.0.0.0', port=8000, log_level="info") server = uvicorn.Server(config) online = 'doorbell/online' will = asyncio_mqtt.Will(topic=online, payload='0') async with asyncio_mqtt.Client( "bang", keepalive=10, # don't go below firmata startup time of 5sec will=will) as client: log.debug(f'with {client=}') cli=client await board.start_aio() t1 = asyncio.create_task(server.serve()) t2 = asyncio.create_task(digital_in_pullup(board, 2)) await client.publish(online, '1') while True: await asyncio.sleep(KILL_TIME) asyncio.run(main(), debug=True)