changeset 10:52e1bb1532f2

serve_inline_graph
author drewp@bigasterisk.com
date Sat, 16 Mar 2024 16:02:23 -0700
parents b72f4ba1345d
children 0bc06da6bf74
files examples/_run_server_child.py examples/serve_inline_graph.py examples/serve_inline_graph_test.py src/rdferry/__init__.py src/rdferry/patch/patch.py src/rdferry/patch_quads.py src/rdferry/patchablegraph.py src/rdferry/rdflib_issues/contains_with_context_398.py src/rdferry/server.py
diffstat 9 files changed, 214 insertions(+), 4 deletions(-) [+]
line wrap: on
line diff
--- a/examples/_run_server_child.py	Sat Mar 16 13:45:09 2024 -0700
+++ b/examples/_run_server_child.py	Sat Mar 16 16:02:23 2024 -0700
@@ -20,9 +20,9 @@
         self.subprocess.terminate()
         await self.subprocess.wait()
 
-    async def get(self, url: str) -> aiohttp.ClientResponse:
+    async def get(self, url: str, headers=None) -> aiohttp.ClientResponse:
         while True:
             try:
-                return await self._session.get(url)
+                return await self._session.get(url, headers=headers)
             except aiohttp.ClientConnectorError:
                 await asyncio.sleep(0.05)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/examples/serve_inline_graph.py	Sat Mar 16 16:02:23 2024 -0700
@@ -0,0 +1,13 @@
+from rdferry import Patch, PatchableGraph, StarletteServer
+from rdflib import RDFS, Literal, Namespace
+
+EX = Namespace('http://example.com/')
+
+server = StarletteServer()
+
+g1 = PatchableGraph()
+g1.patch(Patch(adds=[(EX['greeting'], RDFS.label, Literal('hello world'), EX['process'])]))
+
+server.add_graph_routes('/g1', g1)
+
+server.serve()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/examples/serve_inline_graph_test.py	Sat Mar 16 16:02:23 2024 -0700
@@ -0,0 +1,42 @@
+from pathlib import Path
+
+import pytest
+
+from examples._run_server_child import RunHttpServerChildProcess
+
+server_path = Path('examples/serve_inline_graph.py')
+
+
+@pytest.mark.asyncio
+async def test_server_returns_n3():
+    async with RunHttpServerChildProcess(server_path) as http_server:
+        response = await http_server.get('http://localhost:8005/g1')
+        assert (await response.text()
+                ) == '''@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
+
+<http://example.com/process> {
+    <http://example.com/greeting> rdfs:label "hello world" .
+}
+
+'''
+
+
+@pytest.mark.asyncio
+async def test_server_returns_trig():
+    async with RunHttpServerChildProcess(server_path) as http_server:
+        response = await http_server.get(
+            'http://localhost:8005/g1',
+            headers={'accept': "application/n-quads"})
+        assert (
+            await response.text()
+        ) == '''<http://example.com/greeting> <http://www.w3.org/2000/01/rdf-schema#label> "hello world" <http://example.com/process> .
+
+'''
+
+
+# @pytest.mark.asyncio
+# async def test_server_returns_events():
+#     async with RunHttpServerChildProcess(server_path) as http_server:
+#         response = await http_server.get('http://localhost:8005/g1/events')
+#         assert response.headers['content-type'] == 'x-sse-todo'
+#         assert (await response.text()) == 'clear event then add-patch event'
--- a/src/rdferry/__init__.py	Sat Mar 16 13:45:09 2024 -0700
+++ b/src/rdferry/__init__.py	Sat Mar 16 16:02:23 2024 -0700
@@ -1,2 +1,4 @@
 
 from .server import StarletteServer  # noqa: F401
+from .patch.patch import Patch  # noqa: F401
+from .patchablegraph import PatchableGraph  # noqa: F401
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/rdferry/patch/patch.py	Sat Mar 16 16:02:23 2024 -0700
@@ -0,0 +1,16 @@
+from dataclasses import dataclass, field
+from typing import Collection
+
+from rdflib.graph import _QuadType as Quad
+
+
+
+@dataclass
+class Patch:
+    """
+    immutable
+
+
+    """
+    dels: Collection[Quad] = field(default_factory=set, hash=True)
+    adds: Collection[Quad] = field(default_factory=set, hash=True)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/rdferry/patch_quads.py	Sat Mar 16 16:02:23 2024 -0700
@@ -0,0 +1,44 @@
+
+
+from rdferry.rdflib_issues.contains_with_context_398 import inGraph
+from rdflib import URIRef
+
+
+def _fixContextToUri(spoc):
+    if not isinstance(spoc[3], URIRef):
+        return spoc[:3] + (spoc[3].identifier,)
+    return spoc
+
+def patchQuads(graph, deleteQuads, addQuads, perfect=False):
+    """
+    Delete the sequence of given quads. Then add the given quads just
+    like addN would. If perfect is True, we'll error before the
+    deletes or before the adds (not a real transaction) if any of the
+    deletes isn't in the graph or if any of the adds was already in
+    the graph.
+
+    These input quads use URIRef for the context, but
+    Graph(identifier=) is also allowed (which is what you'll get
+    sometimes from rdflib APIs).
+    """
+    toDelete = []
+    for spoc in deleteQuads:
+        spoc = _fixContextToUri(spoc)
+
+        if perfect:
+            if inGraph(spoc, graph):
+                toDelete.append(spoc)
+            else:
+                raise ValueError("%r not in %r" % (spoc[:3], spoc[3]))
+        else:
+            graph.remove(spoc)
+    for spoc in toDelete:
+        graph.remove(spoc)
+
+    if perfect:
+        addQuads = list(addQuads)
+        for spoc in addQuads:
+            spoc = _fixContextToUri(spoc)
+            if inGraph(spoc, graph):
+                raise ValueError("%r already in %r" % (spoc[:3], spoc[3]))
+    graph.addN(addQuads)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/rdferry/patchablegraph.py	Sat Mar 16 16:02:23 2024 -0700
@@ -0,0 +1,55 @@
+import asyncio
+import itertools
+import weakref
+from rdferry.patch_quads import patchQuads
+from rdferry.rdflib_issues.contains_with_context_398 import inGraph
+from rdflib import ConjunctiveGraph
+import logging
+from rdferry.patch.patch import Patch
+from prometheus_client import Counter, Gauge, Summary
+
+log = logging.getLogger('patchablegraph')
+STATEMENT_COUNT = Gauge('statement_count',
+                        'current PatchableGraph graph size',
+                        labelnames=['graph'])
+PATCH_CALLS = Summary('patch_calls',
+                      'PatchableGraph.patch calls',
+                      labelnames=['graph'])
+_graphsInProcess = itertools.count()
+
+
+class PatchableGraph:
+    """
+    Holds an rdflib.ConjunctiveGraph and only edits it with Patch operations
+    (which can be subscribed to)
+    """
+
+    def __init__(self, label: str | None = None):
+        self._graph = ConjunctiveGraph()
+        # self._subscriptions: weakref.WeakSet[asyncio.Queue] = weakref.WeakSet()
+
+        if label is None:
+            label = f'patchableGraph{next(_graphsInProcess)}'
+        self.label = label
+        log.info('making %r', label)
+
+    def patch(self, p: Patch):
+        with PATCH_CALLS.labels(graph=self.label).time():
+            # assuming no stmt is both in p.addQuads and p.delQuads.
+            dels = set([q for q in p.dels if inGraph(q, self._graph)])
+            adds = set([q for q in p.adds if not inGraph(q, self._graph)])
+            # minimizedP = Patch(dels, adds)
+            # if minimizedP.isNoop():
+            #     return
+            patchQuads(self._graph,
+                       deleteQuads=dels,
+                       addQuads=adds,
+                       perfect=False)  # true?
+            # if self._subscriptions:
+            #     log.debug('PatchableGraph: patched; telling %s observers',
+            #               len(self._subscriptions))
+            # j = patchAsJson(p)
+            # for q in self._subscriptions:
+            #     q.put_nowait(('patch', j))
+            STATEMENT_COUNT.labels(graph=self.label).set(len(self._graph))
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/rdferry/rdflib_issues/contains_with_context_398.py	Sat Mar 16 16:02:23 2024 -0700
@@ -0,0 +1,22 @@
+# workaround for https://github.com/RDFLib/rdflib/issues/398
+
+from rdflib import Graph
+
+
+def inGraph(spoc, graph):
+    """
+    c is just a URIRef.
+    """
+
+    c = spoc[3]
+    if isinstance(c, Graph):
+        c = c.identifier
+
+    for spoc2 in graph.quads(spoc[:3]):
+        if spoc[:3] == spoc2[:3]:
+            c2 = spoc2[3]
+            if isinstance(c2, Graph):
+                c2 = c2.identifier
+            if c == c2:
+                return True
+    return False
\ No newline at end of file
--- a/src/rdferry/server.py	Sat Mar 16 13:45:09 2024 -0700
+++ b/src/rdferry/server.py	Sat Mar 16 16:02:23 2024 -0700
@@ -1,9 +1,13 @@
-from typing import NoReturn
+from functools import partial
 
 import uvicorn
+from rdflib import plugin
+from rdflib.serializer import Serializer
 from starlette.applications import Starlette
+from starlette.requests import Request
 from starlette.responses import PlainTextResponse
-from starlette.routing import Route
+
+from rdferry.patchablegraph import PatchableGraph
 
 
 class StarletteServer:
@@ -17,6 +21,18 @@
         if path == '/':
             self.root_route_is_set = True
 
+    def add_graph_routes(self, path: str, graph: PatchableGraph):
+        """Adds {path} and {path}/events"""
+        self.add_route(path, partial(self._on_graph_request, graph))
+
+    def _on_graph_request(self, graph: PatchableGraph,
+                          request: Request) -> PlainTextResponse:
+        format = request.headers.get('Accept', '*/*') 
+        if format == '*/*':
+            format = 'application/trig'
+        return PlainTextResponse(content=graph._graph.serialize(format=format),
+                                 media_type=format)
+
     def serve(self):
         if not self.root_route_is_set:
             self.add_route('/', lambda req: PlainTextResponse('todo'))