Mercurial > code > home > repos > homeauto
view espNode/readcam.py @ 1742:1c1b38b145f8
rm custom cpp fingerprint driver that's under the wrong name 'desk'
author | drewp@bigasterisk.com |
---|---|
date | Fri, 01 Sep 2023 17:15:25 -0700 |
parents | c77b5ab7b99d |
children |
line wrap: on
line source
#!camtest/bin/python3 import asyncio import binascii import io import json import logging import os import time import apriltag import cv2 import numpy from aioesphomeapi.client import APIClient from aioesphomeapi.model import CameraState from aiohttp import web from aiohttp.web import Response from aiohttp_sse import sse_response from docopt import docopt logging.basicConfig(level=logging.INFO) log = logging.getLogger(__name__) class CameraReceiver: def __init__(self, host): self.lastFrameTime = None self.host = host self.lastFrame = b"", '' self.recent = [] async def start(self): try: self.c = c = APIClient(self.host, 6053, 'MyPassword') await c.connect(login=True) await c.subscribe_states(on_state=self.on_state) except OSError: return self.t = asyncio.create_task( self.start_requesting_image_stream_forever()) async def start_requesting_image_stream_forever(self): while True: try: await self.c.request_image_stream() except AttributeError: return # https://github.com/esphome/esphome/blob/dev/esphome/components/esp32_camera/esp32_camera.cpp#L265 says a 'stream' is 5 sec long await asyncio.sleep(4) def on_state(self, s): if isinstance(s, CameraState): jpg = s.data if len(self.recent) > 10: self.recent = self.recent[-10:] self.recent.append(jpg) #print('recent lens: %s' % (','.join(str(len(x)) # for x in self.recent))) else: print('other on_state', s) def analyze(self, jpg): img = cv2.imdecode(numpy.asarray(bytearray(jpg)), cv2.IMREAD_GRAYSCALE) result = detector.detect(img) msg = {} if result: center = result[0].center msg['center'] = [round(center[0], 2), round(center[1], 2)] return msg async def frames(self): while True: if self.recent: if self.lastFrameTime and time.time() - self.lastFrameTime > 15: print('no recent frames') os.abort() jpg = self.recent.pop(0) msg = self.analyze(jpg) yield jpg, msg self.lastFrame = jpg, msg self.lastFrameTime = time.time() else: await asyncio.sleep(.05) def imageUri(jpg): return 'data:image/jpeg;base64,' + binascii.b2a_base64(jpg).decode('ascii') async def stream(request): async with sse_response(request) as resp: await resp.send(imageUri(recv.lastFrame[0])) await resp.send(json.dumps(recv.lastFrame[1])) async for frame, msg in recv.frames(): await resp.send(json.dumps(msg)) await resp.send(imageUri(frame)) return resp async def index(request): d = r""" <html> <body> <style> #center { position: absolute; font-size: 35px; color: orange; text-shadow: black 0 1px 1px; margin-left: -14px; margin-top: -23px; } </style> <script> var evtSource = new EventSource("/stream"); evtSource.onmessage = function(e) { if (e.data[0] == '{') { const msg = JSON.parse(e.data); const st = document.querySelector('#center').style; if (msg.center) { st.left = msg.center[0]; st.top = msg.center[1]; } else { st.left = -999; } } else { document.getElementById('response').src = e.data; } } </script> <h1>Response from server:</h1> <div style="position: relative"> <img id="response"></img> <span id="center" style="position: absolute">◎</span> </div> </body> </html> """ return Response(text=d, content_type='text/html') arguments = docopt(''' this Usage: this [-v] [--cam host] [--port to_serve] Options: --port n http server [default: 10020] --cam host hostname of esphome server ''') logging.getLogger('aioesphomeapi.connection').setLevel(logging.INFO) recv = CameraReceiver(arguments['--cam']) detector = apriltag.Detector() f = recv.start() async def starter(app): asyncio.create_task(f) start_time = time.time() app = web.Application() app.on_startup.append(starter) app.router.add_route('GET', '/stream', stream) app.router.add_route('GET', '/', index) web.run_app(app, port=int(arguments['--port']))