view lcd_simulator.py @ 8:47795c3121f1

bufferless updates! mqtt message is sent over SPI and everything works
author drewp@bigasterisk.com
date Mon, 11 Mar 2024 01:37:57 -0700
parents d97a5930db7e
children
line wrap: on
line source

import asyncio
import struct

import aiomqtt
import pygame

screen = pygame.display.set_mode((320, 480))
clock = pygame.time.Clock()


async def on_message(client):
    await client.subscribe('display/squib/updates')
    async for message in client.messages:
        payload = bytes(message.payload)
        head, x, y, w, h = struct.unpack("HHHHH", payload[:10])
        buf = payload[10:]
        pygame.draw.rect(screen, (255, 255, 0), (x, y, w, h))
        pygame.display.flip()
        await asyncio.sleep(.02)
        for dy in range(h):
            for dx in range(w):
                off = w * dy + dx
                r, g, b = buf[off * 3:off * 3 + 3]
                screen.set_at((x + dx, y + dy), (r, g, b))
        pygame.display.flip()


async def main():
    async with aiomqtt.Client("mqtt2.bigasterisk.com") as client:
        asyncio.create_task(on_message(client))

        while True:
            for event in pygame.event.get():
                pass
            await asyncio.sleep(1 / 30)


asyncio.run(main())