view examples/_run_server_child.py @ 10:52e1bb1532f2

serve_inline_graph
author drewp@bigasterisk.com
date Sat, 16 Mar 2024 16:02:23 -0700
parents 25538e3ee531
children 0bc06da6bf74
line wrap: on
line source

import asyncio
from dataclasses import dataclass
from pathlib import Path

import aiohttp


@dataclass
class RunHttpServerChildProcess:
    server_path: Path

    async def __aenter__(self):
        self.subprocess = await asyncio.create_subprocess_exec(
            'pdm', 'run', 'python', self.server_path)
        self._session = await aiohttp.ClientSession().__aenter__()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        await self._session.close()
        self.subprocess.terminate()
        await self.subprocess.wait()

    async def get(self, url: str, headers=None) -> aiohttp.ClientResponse:
        while True:
            try:
                return await self._session.get(url, headers=headers)
            except aiohttp.ClientConnectorError:
                await asyncio.sleep(0.05)