view doorbell_to_mqtt.py @ 5:696a46a1b239

logging and minor refactors
author drewp@bigasterisk.com
date Sun, 05 Feb 2023 14:26:04 -0800
parents f822e7fe7120
children
line wrap: on
line source

import asyncio
import logging
import sys

import asyncio_mqtt
import uvicorn
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import HTMLResponse, RedirectResponse
from starlette.routing import Route
from typing import Optional
import telemetrix_local as telemetrix_aio

DIGITAL_PIN = 2  # arduino pin number
KILL_TIME = 5  # sleep time to keep forever loop open
MQTT_RECONNECT_SEC = 5
MIN_REPEAT_SEC = 2

logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger()

dev, = sys.argv[1:]


async def digital_in_pullup(my_board, pin):
    last_report = 0

    async def cb(data):
        nonlocal last_report
        _pinmode, _pin, value, t = data
        pressed = not value
        if pressed:
            if last_report < t - MIN_REPEAT_SEC:
                log.info(f'press {t}')
                await press()
                last_report = t

    await my_board.set_pin_mode_digital_input_pullup(pin, cb)
    await board.enable_digital_reporting(2)
    while True:
        await asyncio.sleep(KILL_TIME)


board = telemetrix_aio.TelemetrixAIO(com_port=dev, autostart=False)

cli: Optional[asyncio_mqtt.Client] =None

async def press():
    await cli.publish('doorbell/button', 'pressed')

async def status(request: Request) -> HTMLResponse:
    return HTMLResponse("""<doctype HTML>
    <html>
    <head><title>doorbell</title></head>
    <body>
    <form method="POST" action="press">
      <button type="submit">test ring</button>
    </form>
    </body>
    </html>
    """)

async def post_press(request: Request) -> RedirectResponse:
    await press()
    return RedirectResponse('.', status_code=303)



app = Starlette(routes=[
    Route('/', status),
    Route('/press', post_press, methods=['POST']),
])


async def main():
    global cli
    config = uvicorn.Config(app, host='0.0.0.0', port=8000, log_level="info")
    server = uvicorn.Server(config)

    online = 'doorbell/online'
    will = asyncio_mqtt.Will(topic=online, payload='0')
    async with asyncio_mqtt.Client(
            "bang",
            keepalive=10,  # don't go below firmata startup time of 5sec
            will=will) as client:
        log.debug(f'with {client=}')
        cli=client
        await board.start_aio()

        t1 = asyncio.create_task(server.serve())
        t2 = asyncio.create_task(digital_in_pullup(board, 2))
        await client.publish(online, '1')
        while True:
            await asyncio.sleep(KILL_TIME)


asyncio.run(main(), debug=True)