Mercurial > code > home > repos > rdferry
changeset 10:52e1bb1532f2
serve_inline_graph
author | drewp@bigasterisk.com |
---|---|
date | Sat, 16 Mar 2024 16:02:23 -0700 |
parents | b72f4ba1345d |
children | 0bc06da6bf74 |
files | examples/_run_server_child.py examples/serve_inline_graph.py examples/serve_inline_graph_test.py src/rdferry/__init__.py src/rdferry/patch/patch.py src/rdferry/patch_quads.py src/rdferry/patchablegraph.py src/rdferry/rdflib_issues/contains_with_context_398.py src/rdferry/server.py |
diffstat | 9 files changed, 214 insertions(+), 4 deletions(-) [+] |
line wrap: on
line diff
--- a/examples/_run_server_child.py Sat Mar 16 13:45:09 2024 -0700 +++ b/examples/_run_server_child.py Sat Mar 16 16:02:23 2024 -0700 @@ -20,9 +20,9 @@ self.subprocess.terminate() await self.subprocess.wait() - async def get(self, url: str) -> aiohttp.ClientResponse: + async def get(self, url: str, headers=None) -> aiohttp.ClientResponse: while True: try: - return await self._session.get(url) + return await self._session.get(url, headers=headers) except aiohttp.ClientConnectorError: await asyncio.sleep(0.05)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/examples/serve_inline_graph.py Sat Mar 16 16:02:23 2024 -0700 @@ -0,0 +1,13 @@ +from rdferry import Patch, PatchableGraph, StarletteServer +from rdflib import RDFS, Literal, Namespace + +EX = Namespace('http://example.com/') + +server = StarletteServer() + +g1 = PatchableGraph() +g1.patch(Patch(adds=[(EX['greeting'], RDFS.label, Literal('hello world'), EX['process'])])) + +server.add_graph_routes('/g1', g1) + +server.serve()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/examples/serve_inline_graph_test.py Sat Mar 16 16:02:23 2024 -0700 @@ -0,0 +1,42 @@ +from pathlib import Path + +import pytest + +from examples._run_server_child import RunHttpServerChildProcess + +server_path = Path('examples/serve_inline_graph.py') + + +@pytest.mark.asyncio +async def test_server_returns_n3(): + async with RunHttpServerChildProcess(server_path) as http_server: + response = await http_server.get('http://localhost:8005/g1') + assert (await response.text() + ) == '''@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> . + +<http://example.com/process> { + <http://example.com/greeting> rdfs:label "hello world" . +} + +''' + + +@pytest.mark.asyncio +async def test_server_returns_trig(): + async with RunHttpServerChildProcess(server_path) as http_server: + response = await http_server.get( + 'http://localhost:8005/g1', + headers={'accept': "application/n-quads"}) + assert ( + await response.text() + ) == '''<http://example.com/greeting> <http://www.w3.org/2000/01/rdf-schema#label> "hello world" <http://example.com/process> . + +''' + + +# @pytest.mark.asyncio +# async def test_server_returns_events(): +# async with RunHttpServerChildProcess(server_path) as http_server: +# response = await http_server.get('http://localhost:8005/g1/events') +# assert response.headers['content-type'] == 'x-sse-todo' +# assert (await response.text()) == 'clear event then add-patch event'
--- a/src/rdferry/__init__.py Sat Mar 16 13:45:09 2024 -0700 +++ b/src/rdferry/__init__.py Sat Mar 16 16:02:23 2024 -0700 @@ -1,2 +1,4 @@ from .server import StarletteServer # noqa: F401 +from .patch.patch import Patch # noqa: F401 +from .patchablegraph import PatchableGraph # noqa: F401 \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/rdferry/patch/patch.py Sat Mar 16 16:02:23 2024 -0700 @@ -0,0 +1,16 @@ +from dataclasses import dataclass, field +from typing import Collection + +from rdflib.graph import _QuadType as Quad + + + +@dataclass +class Patch: + """ + immutable + + + """ + dels: Collection[Quad] = field(default_factory=set, hash=True) + adds: Collection[Quad] = field(default_factory=set, hash=True)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/rdferry/patch_quads.py Sat Mar 16 16:02:23 2024 -0700 @@ -0,0 +1,44 @@ + + +from rdferry.rdflib_issues.contains_with_context_398 import inGraph +from rdflib import URIRef + + +def _fixContextToUri(spoc): + if not isinstance(spoc[3], URIRef): + return spoc[:3] + (spoc[3].identifier,) + return spoc + +def patchQuads(graph, deleteQuads, addQuads, perfect=False): + """ + Delete the sequence of given quads. Then add the given quads just + like addN would. If perfect is True, we'll error before the + deletes or before the adds (not a real transaction) if any of the + deletes isn't in the graph or if any of the adds was already in + the graph. + + These input quads use URIRef for the context, but + Graph(identifier=) is also allowed (which is what you'll get + sometimes from rdflib APIs). + """ + toDelete = [] + for spoc in deleteQuads: + spoc = _fixContextToUri(spoc) + + if perfect: + if inGraph(spoc, graph): + toDelete.append(spoc) + else: + raise ValueError("%r not in %r" % (spoc[:3], spoc[3])) + else: + graph.remove(spoc) + for spoc in toDelete: + graph.remove(spoc) + + if perfect: + addQuads = list(addQuads) + for spoc in addQuads: + spoc = _fixContextToUri(spoc) + if inGraph(spoc, graph): + raise ValueError("%r already in %r" % (spoc[:3], spoc[3])) + graph.addN(addQuads)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/rdferry/patchablegraph.py Sat Mar 16 16:02:23 2024 -0700 @@ -0,0 +1,55 @@ +import asyncio +import itertools +import weakref +from rdferry.patch_quads import patchQuads +from rdferry.rdflib_issues.contains_with_context_398 import inGraph +from rdflib import ConjunctiveGraph +import logging +from rdferry.patch.patch import Patch +from prometheus_client import Counter, Gauge, Summary + +log = logging.getLogger('patchablegraph') +STATEMENT_COUNT = Gauge('statement_count', + 'current PatchableGraph graph size', + labelnames=['graph']) +PATCH_CALLS = Summary('patch_calls', + 'PatchableGraph.patch calls', + labelnames=['graph']) +_graphsInProcess = itertools.count() + + +class PatchableGraph: + """ + Holds an rdflib.ConjunctiveGraph and only edits it with Patch operations + (which can be subscribed to) + """ + + def __init__(self, label: str | None = None): + self._graph = ConjunctiveGraph() + # self._subscriptions: weakref.WeakSet[asyncio.Queue] = weakref.WeakSet() + + if label is None: + label = f'patchableGraph{next(_graphsInProcess)}' + self.label = label + log.info('making %r', label) + + def patch(self, p: Patch): + with PATCH_CALLS.labels(graph=self.label).time(): + # assuming no stmt is both in p.addQuads and p.delQuads. + dels = set([q for q in p.dels if inGraph(q, self._graph)]) + adds = set([q for q in p.adds if not inGraph(q, self._graph)]) + # minimizedP = Patch(dels, adds) + # if minimizedP.isNoop(): + # return + patchQuads(self._graph, + deleteQuads=dels, + addQuads=adds, + perfect=False) # true? + # if self._subscriptions: + # log.debug('PatchableGraph: patched; telling %s observers', + # len(self._subscriptions)) + # j = patchAsJson(p) + # for q in self._subscriptions: + # q.put_nowait(('patch', j)) + STATEMENT_COUNT.labels(graph=self.label).set(len(self._graph)) +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/rdferry/rdflib_issues/contains_with_context_398.py Sat Mar 16 16:02:23 2024 -0700 @@ -0,0 +1,22 @@ +# workaround for https://github.com/RDFLib/rdflib/issues/398 + +from rdflib import Graph + + +def inGraph(spoc, graph): + """ + c is just a URIRef. + """ + + c = spoc[3] + if isinstance(c, Graph): + c = c.identifier + + for spoc2 in graph.quads(spoc[:3]): + if spoc[:3] == spoc2[:3]: + c2 = spoc2[3] + if isinstance(c2, Graph): + c2 = c2.identifier + if c == c2: + return True + return False \ No newline at end of file
--- a/src/rdferry/server.py Sat Mar 16 13:45:09 2024 -0700 +++ b/src/rdferry/server.py Sat Mar 16 16:02:23 2024 -0700 @@ -1,9 +1,13 @@ -from typing import NoReturn +from functools import partial import uvicorn +from rdflib import plugin +from rdflib.serializer import Serializer from starlette.applications import Starlette +from starlette.requests import Request from starlette.responses import PlainTextResponse -from starlette.routing import Route + +from rdferry.patchablegraph import PatchableGraph class StarletteServer: @@ -17,6 +21,18 @@ if path == '/': self.root_route_is_set = True + def add_graph_routes(self, path: str, graph: PatchableGraph): + """Adds {path} and {path}/events""" + self.add_route(path, partial(self._on_graph_request, graph)) + + def _on_graph_request(self, graph: PatchableGraph, + request: Request) -> PlainTextResponse: + format = request.headers.get('Accept', '*/*') + if format == '*/*': + format = 'application/trig' + return PlainTextResponse(content=graph._graph.serialize(format=format), + media_type=format) + def serve(self): if not self.root_route_is_set: self.add_route('/', lambda req: PlainTextResponse('todo'))