view examples/run_server_make_request.py @ 5:b34cd6619316

new test. server has to be able to exit between tests.
author drewp@bigasterisk.com
date Sat, 16 Mar 2024 12:11:32 -0700
parents b5afa9363c3b
children d885fce5e4e7
line wrap: on
line source

import asyncio
from dataclasses import dataclass
from pathlib import Path

import aiohttp


@dataclass
class HttpServer:
    server_path: Path

    async def __aenter__(self):
        self.subprocess = await asyncio.create_subprocess_exec(
            'pdm', 'run', 'python', self.server_path)
        self._session = await aiohttp.ClientSession().__aenter__()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        self.subprocess.terminate()
        await self.subprocess.wait()
    async def get(self, url: str) -> aiohttp.ClientResponse:
        while True:
            try:
                return await self._session.get(url)
            except aiohttp.ClientConnectorError:
                await asyncio.sleep(0.05)