view examples/serve_inline_graph_test.py @ 11:0bc06da6bf74

start ferry1 patch protocol
author drewp@bigasterisk.com
date Mon, 18 Mar 2024 16:42:21 -0700
parents 52e1bb1532f2
children ba73d8ba81dc
line wrap: on
line source

from pathlib import Path
import logging
import pytest

from examples._run_server_child import RunHttpServerChildProcess, assert_event_stream_starts_with

log = logging.getLogger('test')
logging.basicConfig(level=logging.INFO)
server_path = Path('examples/serve_inline_graph.py')


@pytest.mark.asyncio
async def test_server_returns_n3():
    async with RunHttpServerChildProcess(server_path) as http_server:
        response = await http_server.get('http://localhost:8005/g1')
        assert (await response.text()
                ) == '''@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .

<http://example.com/process> {
    <http://example.com/greeting> rdfs:label "hello world" .
}

'''


@pytest.mark.asyncio
async def test_server_returns_trig():
    async with RunHttpServerChildProcess(server_path) as http_server:
        response = await http_server.get(
            'http://localhost:8005/g1',
            headers={'accept': "application/n-quads"})
        assert (
            await response.text()
        ) == '''<http://example.com/greeting> <http://www.w3.org/2000/01/rdf-schema#label> "hello world" <http://example.com/process> .

'''


@pytest.mark.asyncio
async def test_server_returns_startup_events():
    async with RunHttpServerChildProcess(server_path) as http_server:
        await assert_event_stream_starts_with(
            http_server,
            'http://localhost:8005/g1/events',
            expected_events=[
                ('clear', 'ferry1'),
                ('patch',
                 '-\n+\n["http://example.com/greeting", "http://www.w3.org/2000/01/rdf-schema#label", "hello world", "http://www.w3.org/2001/XMLSchema#string", "", "http://example.com/process"]'
                 ),
            ])