diff doorbell_to_mqtt.py @ 2:f822e7fe7120

rewrite to use telemetrix (localized)
author drewp@bigasterisk.com
date Sun, 05 Feb 2023 14:06:19 -0800
parents 7bd85b962845
children
line wrap: on
line diff
--- a/doorbell_to_mqtt.py	Sun Feb 05 14:05:06 2023 -0800
+++ b/doorbell_to_mqtt.py	Sun Feb 05 14:06:19 2023 -0800
@@ -1,60 +1,97 @@
-import time
+import asyncio
 import logging
 import sys
-import paho.mqtt.client as mqtt
-from pyfirmata import Arduino, INPUT
 
-PULLUP = 0x0b
+import asyncio_mqtt
+import uvicorn
+from starlette.applications import Starlette
+from starlette.requests import Request
+from starlette.responses import HTMLResponse, RedirectResponse
+from starlette.routing import Route
+from typing import Optional
+import telemetrix_local as telemetrix_aio
 
-MIN_REPEAT_SEC = .5
+DIGITAL_PIN = 2  # arduino pin number
+KILL_TIME = 5  # sleep time to keep forever loop open
+MQTT_RECONNECT_SEC = 5
+MIN_REPEAT_SEC = 2
 
-logging.basicConfig(level=logging.INFO)
+logging.basicConfig(level=logging.DEBUG)
 log = logging.getLogger()
 
 dev, = sys.argv[1:]
 
-client = mqtt.Client()
-client.will_set('doorbell/online', '0', qos=0, retain=False)
-client.connect('bang')
 
-log.info(f'connecting to {dev}')
-# note that I set StandardFirmata to double baud rate
-board = Arduino(dev, baudrate=115200)
-log.info('done')
+async def digital_in_pullup(my_board, pin):
+    last_report = 0
 
-pin = board.get_pin('d:2:i')
-pin.mode = PULLUP  # config msg to send to firmata
-pin._mode = INPUT  # fake value so pyfirmata still reads this pin
+    async def cb(data):
+        nonlocal last_report
+        _pinmode, _pin, value, t = data
+        pressed = not value
+        if pressed:
+            if last_report < t - MIN_REPEAT_SEC:
+                log.info(f'press {t}')
+                await press()
+                last_report = t
 
-log.info('running mqtt client')
-client.loop_start()
-log.info('done')
-
-client.publish('doorbell/online', '1')
-
-last_value = None
-last_edge = 0
+    await my_board.set_pin_mode_digital_input_pullup(pin, cb)
+    await board.enable_digital_reporting(2)
+    while True:
+        await asyncio.sleep(KILL_TIME)
 
 
-def on_press(t):
-    global last_edge
-    if t > last_edge + MIN_REPEAT_SEC:
-        log.info(f'press {t=}')
-        client.publish('doorbell/button', 'press')
-        log.info('published')
-    last_edge = t
+board = telemetrix_aio.TelemetrixAIO(com_port=dev, autostart=False)
+
+cli: Optional[asyncio_mqtt.Client] =None
+
+async def press():
+    await cli.publish('doorbell/button', 'pressed')
+
+async def status(request: Request) -> HTMLResponse:
+    return HTMLResponse("""<doctype HTML>
+    <html>
+    <head><title>doorbell</title></head>
+    <body>
+    <form method="POST" action="press">
+      <button type="submit">test ring</button>
+    </form>
+    </body>
+    </html>
+    """)
+
+async def post_press(request: Request) -> RedirectResponse:
+    await press()
+    return RedirectResponse('.', status_code=303)
+
 
 
-try:
-    while True:
-        now = time.time()
-        board.iterate()
-        value = pin.read()
-        if value == 0 and last_value == 1:
-            on_press(now)
-        last_value = value
-        client.loop()
-except Exception:
-    client.publish('doorbell/online', '0')
-    client.disconnect()
-    raise
\ No newline at end of file
+app = Starlette(routes=[
+    Route('/', status),
+    Route('/press', post_press, methods=['POST']),
+])
+
+
+async def main():
+    global cli
+    config = uvicorn.Config(app, host='0.0.0.0', port=8000, log_level="info")
+    server = uvicorn.Server(config)
+
+    online = 'doorbell/online'
+    will = asyncio_mqtt.Will(topic=online, payload='0')
+    async with asyncio_mqtt.Client(
+            "bang",
+            keepalive=10,  # don't go below firmata startup time of 5sec
+            will=will) as client:
+        log.debug(f'with {client=}')
+        cli=client
+        await board.start_aio()
+
+        t1 = asyncio.create_task(server.serve())
+        t2 = asyncio.create_task(digital_in_pullup(board, 2))
+        await client.publish(online, '1')
+        while True:
+            await asyncio.sleep(KILL_TIME)
+
+
+asyncio.run(main(), debug=True)
\ No newline at end of file