Mercurial > code > home > repos > front-door-display
view web_to_mqtt.py @ 5:d97a5930db7e
closer
author | drewp@bigasterisk.com |
---|---|
date | Wed, 06 Mar 2024 16:38:58 -0800 |
parents | e273cc60b389 |
children | e36abecb48a1 |
line wrap: on
line source
import asyncio import itertools import random import struct import subprocess import tempfile import aiomqtt from PIL import Image, ImageChops class WebRenderer: def __init__(self): self.chrome_proc = subprocess.Popen(["google-chrome", "--headless"]) print("Chrome subprocess started.") async def capture_screenshot(self, url) -> Image.Image: out = tempfile.NamedTemporaryFile(suffix=".png", prefix='webrenderer_') screenshot_command = [ "google-chrome", "--headless", "--window-size=320,320", f"--screenshot={out.name}", url, ] subprocess.run(screenshot_command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return Image.open(out.name).convert('RGB') block = 320 // 10 dirtyQueue = {} async def check_for_changes(renderer, client, last_image): current_image = await renderer.capture_screenshot( "http://localhost:8002/front-door-display/scheduleLcd.html") diff_image = ImageChops.difference(last_image, current_image) for y in range(0, current_image.height, block): for x in range(0, current_image.width, block): box = (x, y, x + block, y + block) region = diff_image.crop(box) if region.getbbox(): dirtyQueue[(x, y)] = current_image.crop(box) await asyncio.sleep(0) return current_image async def sendDirty(client): while True: if dirtyQueue: # pos = random.choice(list(dirtyQueue.keys())) pos = min(list(dirtyQueue.keys())) img = dirtyQueue.pop(pos) await tell_lcd(client, pos[0], pos[1], img) await asyncio.sleep(.15) framesSent = itertools.count() async def tell_lcd(client: aiomqtt.Client, x: int, y: int, region: Image.Image): seq = next(framesSent) msg = struct.pack('HHHHH', seq, x, y, region.width, region.height) + region.tobytes() print(f'send {seq=} {x=} {y=} {region.width=} {region.height=} ', end='\r', flush=True) await client.publish('display/squib/updates', msg, qos=0) async def main(): # also listen for dirty msgs from the web page; see ts. # also get notified of new mqtt listeners who need a full image refresh. renderer = WebRenderer() async with aiomqtt.Client("mqtt2") as client: asyncio.create_task(sendDirty(client)) last_image = Image.new('RGB', (320, 320)) while True: last_image = await check_for_changes(renderer, client, last_image) # we could get the web page to tell us when any dom changes await asyncio.sleep(5) if __name__ == "__main__": asyncio.run(main())