# HG changeset patch # User drewp@bigasterisk.com # Date 1650783521 25200 # Node ID e11d407c46f8aa6170bd9f196df921c2f348eaf6 # Parent 1eac256693335958e6b5c28adf09685ced0da1db rewrite for asyncio and starlette diff -r 1eac25669333 -r e11d407c46f8 __init__.py --- a/__init__.py Sat Apr 23 23:48:55 2022 -0700 +++ b/__init__.py Sat Apr 23 23:58:41 2022 -0700 @@ -1,1 +1,2 @@ -from .patchablegraph import PatchableGraph, CycloneGraphEventsHandler, CycloneGraphHandler, jsonFromPatch +from .patchablegraph import PatchableGraph, jsonFromPatch +import patchablegraph_handler as handler diff -r 1eac25669333 -r e11d407c46f8 patch_cyclone.py --- a/patch_cyclone.py Sat Apr 23 23:48:55 2022 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,20 +0,0 @@ - - -def patch_sse(): - from cyclone import escape - import cyclone.sse - def new_sendEvent(self, message, event=None, eid=None, retry=None): - if isinstance(message, dict): - message = escape.json_encode(message) - if isinstance(message, str): - message = message.encode("utf-8") - assert isinstance(message, bytes) - if eid: - self.transport.write(b"id: " + eid.encode("utf-8") + b"\n") - if event: - self.transport.write(b"event: " + event + b"\n") - if retry: - self.transport.write(b"retry: " + retry.encode("utf-8") + b"\n") - self.transport.write(b"data: " + message + b"\n\n") - - cyclone.sse.SSEHandler.sendEvent = new_sendEvent diff -r 1eac25669333 -r e11d407c46f8 patchablegraph.py --- a/patchablegraph.py Sat Apr 23 23:48:55 2022 -0700 +++ b/patchablegraph.py Sat Apr 23 23:58:41 2022 -0700 @@ -20,27 +20,24 @@ differences between RDF graphs """ -import html +import asyncio import itertools import json import logging -from typing import Callable, List, Optional, cast +import weakref +from typing import Callable, List, NewType, Optional, cast, Set -import cyclone.sse -import cyclone.web -from cycloneerr import PrettyErrorHandler from prometheus_client import Counter, Gauge, Summary from rdfdb.grapheditapi import GraphEditApi from rdfdb.patch import Patch from rdfdb.rdflibpatch import inGraph, patchQuads from rdflib import ConjunctiveGraph -from rdflib.namespace import NamespaceManager from rdflib.parser import StringInputSource from rdflib.plugins.serializers.jsonld import from_rdf -from . import patch_cyclone +JsonSerializedPatch = NewType('JsonSerializedPatch', str) +JsonLdSerializedGraph = NewType('JsonLdSerializedGraph', str) -patch_cyclone.patch_sse() log = logging.getLogger('patchablegraph') SERIALIZE_CALLS = Summary('serialize_calls', 'PatchableGraph.serialize calls', labelnames=['graph']) @@ -53,25 +50,24 @@ # forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py def _graphFromQuads2(q): g = ConjunctiveGraph() - #g.addN(q) # no effect on nquad output + # g.addN(q) # no effect on nquad output for s, p, o, c in q: g.get_context(c).add((s, p, o)) # kind of works with broken rdflib nquad serializer code - #g.store.add((s,p,o), c) # no effect on nquad output + # g.store.add((s,p,o), c) # no effect on nquad output return g -def jsonFromPatch(p: Patch) -> str: - return json.dumps( - {'patch': { - 'adds': from_rdf(_graphFromQuads2(p.addQuads)), - 'deletes': from_rdf(_graphFromQuads2(p.delQuads)), - }}) +def jsonFromPatch(p: Patch) -> JsonSerializedPatch: + return cast(JsonSerializedPatch, json.dumps({'patch': { + 'adds': from_rdf(_graphFromQuads2(p.addQuads)), + 'deletes': from_rdf(_graphFromQuads2(p.delQuads)), + }})) patchAsJson = jsonFromPatch # deprecated name -def patchFromJson(j: str) -> Patch: +def patchFromJson(j: JsonSerializedPatch) -> Patch: body = json.loads(j)['patch'] a = ConjunctiveGraph() a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld') @@ -80,10 +76,10 @@ return Patch(addGraph=a, delGraph=d) -def graphAsJson(g: ConjunctiveGraph) -> str: +def graphAsJson(g: ConjunctiveGraph) -> JsonLdSerializedGraph: # This is not the same as g.serialize(format='json-ld')! That # version omits literal datatypes. - return json.dumps(from_rdf(g)) + return cast(JsonLdSerializedGraph, json.dumps(from_rdf(g))) _graphsInProcess = itertools.count() @@ -97,10 +93,12 @@ def __init__(self, label: Optional[str] = None): self._graph = ConjunctiveGraph() - self._observers: List[Callable[[str], None]] = [] + self._subscriptions: weakref.WeakSet[asyncio.Queue] = weakref.WeakSet() + if label is None: label = f'patchableGraph{next(_graphsInProcess)}' self.label = label + log.info('making %r', label) def serialize(self, *arg, **kw) -> bytes: with SERIALIZE_CALLS.labels(graph=self.label).time(): @@ -115,159 +113,32 @@ if minimizedP.isNoop(): return patchQuads(self._graph, deleteQuads=dels, addQuads=adds, perfect=False) # true? - for ob in self._observers: - ob(patchAsJson(p)) + if self._subscriptions: + log.info('PatchableGraph: patched; telling %s observers', len(self._subscriptions)) + j = patchAsJson(p) + for q in self._subscriptions: + q.put_nowait(('patch', j)) STATEMENT_COUNT.labels(graph=self.label).set(len(self._graph)) - def asJsonLd(self) -> str: + def asJsonLd(self) -> JsonLdSerializedGraph: return graphAsJson(self._graph) - def addObserver(self, onPatch: Callable[[str], None]): - self._observers.append(onPatch) - OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._observers)) - OBSERVERS_ADDED.labels(graph=self.label).inc() + def subscribeToPatches(self) -> asyncio.Queue: + q = asyncio.Queue() + qref = weakref.ref(q, self._onUnsubscribe) + self._initialSubscribeEvents(qref) + return q - def removeObserver(self, onPatch: Callable[[str], None]): - try: - self._observers.remove(onPatch) - except ValueError: - pass - self._currentObservers = len(self._observers) + def _initialSubscribeEvents(self, qref): + q = qref() + log.info('new sub queue %s', q) + self._subscriptions.add(q) # when caller forgets about queue, we will too + OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._subscriptions)) + OBSERVERS_ADDED.labels(graph=self.label).inc() + q.put_nowait(('graph', self.asJsonLd())) # this should be chunked, or just done as reset + patches + + def _onUnsubscribe(self, qref): + OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._subscriptions)) # minus one? def setToGraph(self, newGraph: ConjunctiveGraph): self.patch(Patch.fromDiff(self._graph, newGraph)) - - -SEND_SIMPLE_GRAPH = Summary('send_simple_graph', 'calls to _writeGraphResponse') - - -class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler): - - def initialize(self, masterGraph: PatchableGraph): - self.masterGraph = masterGraph - - def get(self): - with SEND_SIMPLE_GRAPH.time(): - self._writeGraphResponse() - - def _writeGraphResponse(self): - acceptHeader = self.request.headers.get( - 'Accept', - # see https://github.com/fiorix/cyclone/issues/20 - self.request.headers.get('accept', '')) - - if acceptHeader == 'application/nquads': - self.set_header('Content-type', 'application/nquads') - self.masterGraph.serialize(self, format='nquads') - elif acceptHeader == 'application/ld+json': - self.set_header('Content-type', 'application/ld+json') - self.masterGraph.serialize(self, format='json-ld', indent=2) - else: - if acceptHeader.startswith('text/html'): - self._writeGraphForBrowser() - return - self.set_header('Content-type', 'application/x-trig') - self.masterGraph.serialize(self, format='trig') - - def _writeGraphForBrowser(self): - # We think this is a browser, so respond with a live graph view - # (todo) - self.set_header('Content-type', 'text/html') - - self.write(b''' -
''')
-
-        ns = NamespaceManager(self.masterGraph._graph)
-        # maybe these could be on the PatchableGraph instance
-        ns.bind('ex', 'http://example.com/')
-        ns.bind('', 'http://projects.bigasterisk.com/room/')
-        ns.bind("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#")
-        ns.bind("xsd", "http://www.w3.org/2001/XMLSchema#")
-
-        for s, p, o, g in sorted(self.masterGraph._graph.quads()):
-            g = g.identifier
-            nquadLine = f'{s.n3(ns)} {p.n3(ns)} {o.n3(ns)} {g.n3(ns)} .\n'
-            self.write(html.escape(nquadLine).encode('utf8'))
-
-        self.write(b'''
-        
-

- [refresh] - -

- - - ''') - - -SEND_FULL_GRAPH = Summary('send_full_graph', 'fullGraph SSE events') -SEND_PATCH = Summary('send_patch', 'patch SSE events') - - -class CycloneGraphEventsHandler(cyclone.sse.SSEHandler): - """ - One session with one client. - - returns current graph plus future patches to keep remote version - in sync with ours. - - intsead of turning off buffering all over, it may work for this - response to send 'x-accel-buffering: no', per - http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering - """ - - def __init__(self, application, request, masterGraph): - cyclone.sse.SSEHandler.__init__(self, application, request) - self.masterGraph = masterGraph - - def bind(self): - with SEND_FULL_GRAPH.time(): - graphJson = self.masterGraph.asJsonLd() - log.debug("send fullGraph event: %s", graphJson) - self.sendEvent(message=graphJson, event=b'fullGraph') - self.masterGraph.addObserver(self.onPatch) - - def onPatch(self, patchJson): - with SEND_PATCH.time(): - # throttle and combine patches here- ideally we could see how - # long the latency to the client is to make a better rate choice - self.sendEvent(message=patchJson, event=b'patch') - - def unbind(self): - self.masterGraph.removeObserver(self.onPatch) - - def flush(self): - if getattr(self.settings, 'allowOrigin'): - allow = self.settings.allowOrigin - if allow != b'*': - raise NotImplementedError() - self.set_header(b"Access-Control-Allow-Origin", allow) - return CycloneGraphEventsHandler.flush(self) \ No newline at end of file diff -r 1eac25669333 -r e11d407c46f8 patchablegraph_handler.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/patchablegraph_handler.py Sat Apr 23 23:58:41 2022 -0700 @@ -0,0 +1,135 @@ +import html +import logging +from typing import Callable, Tuple + +from prometheus_client import Summary +from rdflib.namespace import NamespaceManager +from sse_starlette import ServerSentEvent +from sse_starlette.sse import EventSourceResponse +from starlette.requests import Request +from starlette.responses import Response + +from patchablegraph import PatchableGraph + +log = logging.getLogger('patchablegraph') + +SEND_SIMPLE_GRAPH = Summary('send_simple_graph', 'calls to _writeGraphResponse') +SEND_FULL_GRAPH = Summary('send_full_graph', 'fullGraph SSE events') +SEND_PATCH = Summary('send_patch', 'patch SSE events') + + +def StaticGraph(masterGraph: PatchableGraph) -> Callable[[Request], Response]: + """ + e.g. + Route('/graph/environment', StaticGraph(masterGraph)), + """ + + @SEND_SIMPLE_GRAPH.time() + def handle(request: Request) -> Response: + ctype, content = _writeGraphResponse(masterGraph, request.headers.get('accept', default='')) + r = Response(content=content) + r.media_type = ctype + return r + + return handle + + +def _writeGraphResponse(masterGraph: PatchableGraph, accept: str) -> Tuple[str, bytes]: + if accept == 'application/nquads': + return 'application/nquads', masterGraph.serialize(format='nquads') + elif accept == 'application/ld+json': + return 'application/ld+json', masterGraph.serialize(format='json-ld', indent=2) + else: + if accept.startswith('text/html'): + return _writeGraphForBrowser(masterGraph) + return 'application/x-trig', masterGraph.serialize(format='trig') + + +def _writeGraphForBrowser(masterGraph: PatchableGraph) -> Tuple[str, bytes]: + # We think this is a browser, so respond with a live graph view + # (todo) + out = (b''' +
''')
+
+    ns = NamespaceManager(masterGraph._graph)
+    # maybe these could be on the PatchableGraph instance
+    ns.bind('ex', 'http://example.com/')
+    ns.bind('', 'http://projects.bigasterisk.com/room/')
+    ns.bind("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#")
+    ns.bind("xsd", "http://www.w3.org/2001/XMLSchema#")
+
+    for s, p, o, g in sorted(masterGraph._graph.quads()):
+        g = g.identifier
+        nquadLine = f'{s.n3(ns)} {p.n3(ns)} {o.n3(ns)} {g.n3(ns)} .\n'
+        out += html.escape(nquadLine).encode('utf8')
+
+    out += b'''
+    
+

+ [refresh] + +

+ + + ''' + return 'text/html', out + + +def GraphEvents(masterGraph: PatchableGraph): + """ + e.g. + Route('/graph/environment/events', GraphEvents(masterGraph)), + """ + + async def generateEvents(): + events = masterGraph.subscribeToPatches() + while True: # we'll get cancelled by EventSourceResponse when the conn drops + etype, data = await events.get() + # Are there more to get? We might throttle and combine patches here- ideally we could see how + # long the latency to the client is to make a better rate choice + metric = {'graph': SEND_FULL_GRAPH, 'patch': SEND_PATCH}[etype] + with metric.time(): + yield ServerSentEvent(event=etype, data=data) + + async def handle(request: Request): + """ + One session with one client. + + returns current graph plus future patches to keep remote version + in sync with ours. + + instead of turning off buffering all over, it may work for this + response to send 'x-accel-buffering: no', per + http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering + """ + return EventSourceResponse(generateEvents()) + + return handle diff -r 1eac25669333 -r e11d407c46f8 setup.py --- a/setup.py Sat Apr 23 23:48:55 2022 -0700 +++ b/setup.py Sat Apr 23 23:58:41 2022 -0700 @@ -2,19 +2,17 @@ setup( name='patchablegraph', - version='0.19.0', + version='1.0.0', packages=['patchablegraph'], package_dir={'patchablegraph': ''}, install_requires=[ - 'cyclone', - 'twisted', 'rdflib >= 6.0.1', 'rdfdb >= 0.8.0', 'prometheus_client', - 'cycloneerr', - 'twisted_sse >= 0.3.0', + 'sse_starlette>=0.10.3', + 'starlette>=0.19.1' ], - url='https://projects.bigasterisk.com/patchablegraph/patchablegraph-0.19.0.tar.gz', + url='https://projects.bigasterisk.com/patchablegraph/patchablegraph-1.0.0.tar.gz', author='Drew Perttula', author_email='drewp@bigasterisk.com', )