2
|
1 import asyncio
|
0
|
2 import logging
|
|
3 import sys
|
|
4
|
2
|
5 import asyncio_mqtt
|
|
6 import uvicorn
|
|
7 from starlette.applications import Starlette
|
|
8 from starlette.requests import Request
|
|
9 from starlette.responses import HTMLResponse, RedirectResponse
|
|
10 from starlette.routing import Route
|
|
11 from typing import Optional
|
|
12 import telemetrix_local as telemetrix_aio
|
0
|
13
|
2
|
14 DIGITAL_PIN = 2 # arduino pin number
|
|
15 KILL_TIME = 5 # sleep time to keep forever loop open
|
|
16 MQTT_RECONNECT_SEC = 5
|
|
17 MIN_REPEAT_SEC = 2
|
0
|
18
|
2
|
19 logging.basicConfig(level=logging.DEBUG)
|
0
|
20 log = logging.getLogger()
|
|
21
|
|
22 dev, = sys.argv[1:]
|
|
23
|
|
24
|
2
|
25 async def digital_in_pullup(my_board, pin):
|
|
26 last_report = 0
|
0
|
27
|
2
|
28 async def cb(data):
|
|
29 nonlocal last_report
|
|
30 _pinmode, _pin, value, t = data
|
|
31 pressed = not value
|
|
32 if pressed:
|
|
33 if last_report < t - MIN_REPEAT_SEC:
|
|
34 log.info(f'press {t}')
|
|
35 await press()
|
|
36 last_report = t
|
0
|
37
|
2
|
38 await my_board.set_pin_mode_digital_input_pullup(pin, cb)
|
|
39 await board.enable_digital_reporting(2)
|
|
40 while True:
|
|
41 await asyncio.sleep(KILL_TIME)
|
0
|
42
|
|
43
|
2
|
44 board = telemetrix_aio.TelemetrixAIO(com_port=dev, autostart=False)
|
|
45
|
|
46 cli: Optional[asyncio_mqtt.Client] =None
|
|
47
|
|
48 async def press():
|
|
49 await cli.publish('doorbell/button', 'pressed')
|
|
50
|
|
51 async def status(request: Request) -> HTMLResponse:
|
|
52 return HTMLResponse("""<doctype HTML>
|
|
53 <html>
|
|
54 <head><title>doorbell</title></head>
|
|
55 <body>
|
|
56 <form method="POST" action="press">
|
|
57 <button type="submit">test ring</button>
|
|
58 </form>
|
|
59 </body>
|
|
60 </html>
|
|
61 """)
|
|
62
|
|
63 async def post_press(request: Request) -> RedirectResponse:
|
|
64 await press()
|
|
65 return RedirectResponse('.', status_code=303)
|
|
66
|
0
|
67
|
|
68
|
2
|
69 app = Starlette(routes=[
|
|
70 Route('/', status),
|
|
71 Route('/press', post_press, methods=['POST']),
|
|
72 ])
|
|
73
|
|
74
|
|
75 async def main():
|
|
76 global cli
|
|
77 config = uvicorn.Config(app, host='0.0.0.0', port=8000, log_level="info")
|
|
78 server = uvicorn.Server(config)
|
|
79
|
|
80 online = 'doorbell/online'
|
|
81 will = asyncio_mqtt.Will(topic=online, payload='0')
|
|
82 async with asyncio_mqtt.Client(
|
|
83 "bang",
|
|
84 keepalive=10, # don't go below firmata startup time of 5sec
|
|
85 will=will) as client:
|
|
86 log.debug(f'with {client=}')
|
|
87 cli=client
|
|
88 await board.start_aio()
|
|
89
|
|
90 t1 = asyncio.create_task(server.serve())
|
|
91 t2 = asyncio.create_task(digital_in_pullup(board, 2))
|
|
92 await client.publish(online, '1')
|
|
93 while True:
|
|
94 await asyncio.sleep(KILL_TIME)
|
|
95
|
|
96
|
|
97 asyncio.run(main(), debug=True) |