view examples/_run_server_child.py @ 11:0bc06da6bf74

start ferry1 patch protocol
author drewp@bigasterisk.com
date Mon, 18 Mar 2024 16:42:21 -0700
parents 52e1bb1532f2
children
line wrap: on
line source

import asyncio
import contextlib
import logging
from dataclasses import dataclass
from datetime import timedelta
from pathlib import Path

import aiohttp
from aiohttp_sse_client import client as sse_client

log = logging.getLogger('chil')


@dataclass
class RunHttpServerChildProcess:
    server_path: Path

    async def __aenter__(self):
        self.subprocess = await asyncio.create_subprocess_exec(
            'pdm', 'run', 'python', self.server_path)
        self._session = await aiohttp.ClientSession().__aenter__()
        return self

    async def __aexit__(self, exc_type, exc, tb):
        await self._session.close()
        self.subprocess.terminate()
        await self.subprocess.wait()

    async def get(self, url: str, headers=None) -> aiohttp.ClientResponse:
        while True:
            try:
                return await self._session.get(url, headers=headers)
            except aiohttp.ClientConnectorError:
                await asyncio.sleep(0.05)

    @contextlib.asynccontextmanager
    async def eventSource(self, url: str):
        async with sse_client.EventSource(
                url, reconnection_time=timedelta(seconds=.05)) as es:
            yield es


async def assert_event_stream_starts_with(http_server, url, expected_events):
    events_left = expected_events[:]
    async with http_server.eventSource(url) as es:
        async for event in es:
            assert (event.message, event.data) == events_left[0]
            events_left.pop(0)
            if not events_left:
                break