Mercurial > code > home > repos > front-door-display
view web_to_mqtt.py @ 11:affb3c8f3f58
schedulelcd progress
author | drewp@bigasterisk.com |
---|---|
date | Thu, 06 Jun 2024 14:49:04 -0700 |
parents | 47795c3121f1 |
children |
line wrap: on
line source
import asyncio import itertools import random import struct import subprocess import tempfile import aiomqtt from PIL import Image, ImageChops class WebRenderer: def __init__(self): self.chrome_proc = subprocess.Popen(["google-chrome", "--headless"]) print("Chrome subprocess started.") async def capture_screenshot(self, url) -> Image.Image: out = tempfile.NamedTemporaryFile(suffix=".png", prefix='webrenderer_') screenshot_command = [ "google-chrome", "--headless", "--window-size=480,320", f"--screenshot={out.name}", url, ] subprocess.run(screenshot_command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return Image.open(out.name).convert('RGB').rotate(-90, expand=True) blockX = 32 blockY = 32 msgDelay = .05 dirtyQueue = {} async def check_for_changes(renderer, client, last_image): current_image = await renderer.capture_screenshot( "http://localhost:8002/front-door-display/scheduleLcd.html") diff_image = ImageChops.difference(last_image, current_image) for y in range(0, current_image.height, blockY): for x in range(0, current_image.width, blockX): box = (x, y, x + blockX, y + blockY) region = diff_image.crop(box) if region.getbbox(): dirtyQueue[(x, y)] = current_image.crop(box) await asyncio.sleep(0) return current_image async def sendDirty(client): while True: if dirtyQueue: # pos = random.choice(list(dirtyQueue.keys())) pos = min(list(dirtyQueue.keys())) img = dirtyQueue.pop(pos) await tell_lcd(client, pos[0], pos[1], img) await asyncio.sleep(msgDelay) # too fast and esp restarts framesSent = itertools.count() async def tell_lcd(client: aiomqtt.Client, x: int, y: int, region: Image.Image): seq = next(framesSent) % 65535 msg = struct.pack('HHHHH', seq, x, y, region.width, region.height) + region.tobytes() print(f'send {seq=} {x=} {y=} {region.width=} {region.height=} ', end='\r', flush=True) await client.publish('display/squib/updates', msg, qos=0) async def main(): # also listen for dirty msgs from the web page; see ts. # also get notified of new mqtt listeners who need a full image refresh. renderer = WebRenderer() async with aiomqtt.Client("mqtt2") as client: asyncio.create_task(sendDirty(client)) last_image = Image.new('RGB', (320,480)) while True: last_image = await check_for_changes(renderer, client, last_image) # we could get the web page to tell us when any dom changes await asyncio.sleep(60) if __name__ == "__main__": asyncio.run(main())