Mercurial > code > home > repos > rdferry
view examples/_run_server_child.py @ 11:0bc06da6bf74
start ferry1 patch protocol
author | drewp@bigasterisk.com |
---|---|
date | Mon, 18 Mar 2024 16:42:21 -0700 |
parents | 52e1bb1532f2 |
children |
line wrap: on
line source
import asyncio import contextlib import logging from dataclasses import dataclass from datetime import timedelta from pathlib import Path import aiohttp from aiohttp_sse_client import client as sse_client log = logging.getLogger('chil') @dataclass class RunHttpServerChildProcess: server_path: Path async def __aenter__(self): self.subprocess = await asyncio.create_subprocess_exec( 'pdm', 'run', 'python', self.server_path) self._session = await aiohttp.ClientSession().__aenter__() return self async def __aexit__(self, exc_type, exc, tb): await self._session.close() self.subprocess.terminate() await self.subprocess.wait() async def get(self, url: str, headers=None) -> aiohttp.ClientResponse: while True: try: return await self._session.get(url, headers=headers) except aiohttp.ClientConnectorError: await asyncio.sleep(0.05) @contextlib.asynccontextmanager async def eventSource(self, url: str): async with sse_client.EventSource( url, reconnection_time=timedelta(seconds=.05)) as es: yield es async def assert_event_stream_starts_with(http_server, url, expected_events): events_left = expected_events[:] async with http_server.eventSource(url) as es: async for event in es: assert (event.message, event.data) == events_left[0] events_left.pop(0) if not events_left: break