view examples/_run_server_child.py @ 8:25538e3ee531

rename
author drewp@bigasterisk.com
date Sat, 16 Mar 2024 12:51:39 -0700
parents examples/run_server_make_request.py@d885fce5e4e7
children 52e1bb1532f2
line wrap: on
line source

import asyncio
from dataclasses import dataclass
from pathlib import Path

import aiohttp


@dataclass
class RunHttpServerChildProcess:
    server_path: Path

    async def __aenter__(self):
        self.subprocess = await asyncio.create_subprocess_exec(
            'pdm', 'run', 'python', self.server_path)
        self._session = await aiohttp.ClientSession().__aenter__()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        await self._session.close()
        self.subprocess.terminate()
        await self.subprocess.wait()

    async def get(self, url: str) -> aiohttp.ClientResponse:
        while True:
            try:
                return await self._session.get(url)
            except aiohttp.ClientConnectorError:
                await asyncio.sleep(0.05)