changeset 25:e11d407c46f8

rewrite for asyncio and starlette
author drewp@bigasterisk.com
date Sat, 23 Apr 2022 23:58:41 -0700
parents 1eac25669333
children 4668117dcd24
files __init__.py patch_cyclone.py patchablegraph.py patchablegraph_handler.py setup.py
diffstat 5 files changed, 180 insertions(+), 195 deletions(-) [+]
line wrap: on
line diff
--- a/__init__.py	Sat Apr 23 23:48:55 2022 -0700
+++ b/__init__.py	Sat Apr 23 23:58:41 2022 -0700
@@ -1,1 +1,2 @@
-from .patchablegraph import PatchableGraph, CycloneGraphEventsHandler, CycloneGraphHandler, jsonFromPatch
+from .patchablegraph import PatchableGraph, jsonFromPatch
+import patchablegraph_handler as handler
--- a/patch_cyclone.py	Sat Apr 23 23:48:55 2022 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,20 +0,0 @@
-
-
-def patch_sse():
-    from cyclone import escape
-    import cyclone.sse
-    def new_sendEvent(self, message, event=None, eid=None, retry=None):
-        if isinstance(message, dict):
-            message = escape.json_encode(message)
-        if isinstance(message, str):
-            message = message.encode("utf-8")
-        assert isinstance(message, bytes)
-        if eid:
-            self.transport.write(b"id: " + eid.encode("utf-8") + b"\n")
-        if event:
-            self.transport.write(b"event: " + event + b"\n")
-        if retry:
-            self.transport.write(b"retry: " + retry.encode("utf-8") + b"\n")
-        self.transport.write(b"data: " + message + b"\n\n")
-
-    cyclone.sse.SSEHandler.sendEvent = new_sendEvent
--- a/patchablegraph.py	Sat Apr 23 23:48:55 2022 -0700
+++ b/patchablegraph.py	Sat Apr 23 23:58:41 2022 -0700
@@ -20,27 +20,24 @@
 differences between RDF graphs
 
 """
-import html
+import asyncio
 import itertools
 import json
 import logging
-from typing import Callable, List, Optional, cast
+import weakref
+from typing import Callable, List, NewType, Optional, cast, Set
 
-import cyclone.sse
-import cyclone.web
-from cycloneerr import PrettyErrorHandler
 from prometheus_client import Counter, Gauge, Summary
 from rdfdb.grapheditapi import GraphEditApi
 from rdfdb.patch import Patch
 from rdfdb.rdflibpatch import inGraph, patchQuads
 from rdflib import ConjunctiveGraph
-from rdflib.namespace import NamespaceManager
 from rdflib.parser import StringInputSource
 from rdflib.plugins.serializers.jsonld import from_rdf
 
-from . import patch_cyclone
+JsonSerializedPatch = NewType('JsonSerializedPatch', str)
+JsonLdSerializedGraph = NewType('JsonLdSerializedGraph', str)
 
-patch_cyclone.patch_sse()
 log = logging.getLogger('patchablegraph')
 
 SERIALIZE_CALLS = Summary('serialize_calls', 'PatchableGraph.serialize calls', labelnames=['graph'])
@@ -53,25 +50,24 @@
 # forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py
 def _graphFromQuads2(q):
     g = ConjunctiveGraph()
-    #g.addN(q) # no effect on nquad output
+    # g.addN(q) # no effect on nquad output
     for s, p, o, c in q:
         g.get_context(c).add((s, p, o))  # kind of works with broken rdflib nquad serializer code
-        #g.store.add((s,p,o), c) # no effect on nquad output
+        # g.store.add((s,p,o), c) # no effect on nquad output
     return g
 
 
-def jsonFromPatch(p: Patch) -> str:
-    return json.dumps(
-        {'patch': {
-            'adds': from_rdf(_graphFromQuads2(p.addQuads)),
-            'deletes': from_rdf(_graphFromQuads2(p.delQuads)),
-        }})
+def jsonFromPatch(p: Patch) -> JsonSerializedPatch:
+    return cast(JsonSerializedPatch, json.dumps({'patch': {
+        'adds': from_rdf(_graphFromQuads2(p.addQuads)),
+        'deletes': from_rdf(_graphFromQuads2(p.delQuads)),
+    }}))
 
 
 patchAsJson = jsonFromPatch  # deprecated name
 
 
-def patchFromJson(j: str) -> Patch:
+def patchFromJson(j: JsonSerializedPatch) -> Patch:
     body = json.loads(j)['patch']
     a = ConjunctiveGraph()
     a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld')
@@ -80,10 +76,10 @@
     return Patch(addGraph=a, delGraph=d)
 
 
-def graphAsJson(g: ConjunctiveGraph) -> str:
+def graphAsJson(g: ConjunctiveGraph) -> JsonLdSerializedGraph:
     # This is not the same as g.serialize(format='json-ld')! That
     # version omits literal datatypes.
-    return json.dumps(from_rdf(g))
+    return cast(JsonLdSerializedGraph, json.dumps(from_rdf(g)))
 
 
 _graphsInProcess = itertools.count()
@@ -97,10 +93,12 @@
 
     def __init__(self, label: Optional[str] = None):
         self._graph = ConjunctiveGraph()
-        self._observers: List[Callable[[str], None]] = []
+        self._subscriptions: weakref.WeakSet[asyncio.Queue] = weakref.WeakSet()
+
         if label is None:
             label = f'patchableGraph{next(_graphsInProcess)}'
         self.label = label
+        log.info('making %r', label)
 
     def serialize(self, *arg, **kw) -> bytes:
         with SERIALIZE_CALLS.labels(graph=self.label).time():
@@ -115,159 +113,32 @@
             if minimizedP.isNoop():
                 return
             patchQuads(self._graph, deleteQuads=dels, addQuads=adds, perfect=False)  # true?
-            for ob in self._observers:
-                ob(patchAsJson(p))
+            if self._subscriptions:
+                log.info('PatchableGraph: patched; telling %s observers', len(self._subscriptions))
+            j = patchAsJson(p)
+            for q in self._subscriptions:
+                q.put_nowait(('patch', j))
             STATEMENT_COUNT.labels(graph=self.label).set(len(self._graph))
 
-    def asJsonLd(self) -> str:
+    def asJsonLd(self) -> JsonLdSerializedGraph:
         return graphAsJson(self._graph)
 
-    def addObserver(self, onPatch: Callable[[str], None]):
-        self._observers.append(onPatch)
-        OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._observers))
-        OBSERVERS_ADDED.labels(graph=self.label).inc()
+    def subscribeToPatches(self) -> asyncio.Queue:
+        q = asyncio.Queue()
+        qref = weakref.ref(q, self._onUnsubscribe)
+        self._initialSubscribeEvents(qref)
+        return q
 
-    def removeObserver(self, onPatch: Callable[[str], None]):
-        try:
-            self._observers.remove(onPatch)
-        except ValueError:
-            pass
-        self._currentObservers = len(self._observers)
+    def _initialSubscribeEvents(self, qref):
+        q = qref()
+        log.info('new sub queue %s', q)
+        self._subscriptions.add(q)  # when caller forgets about queue, we will too
+        OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._subscriptions))
+        OBSERVERS_ADDED.labels(graph=self.label).inc()
+        q.put_nowait(('graph', self.asJsonLd()))  # this should be chunked, or just done as reset + patches
+
+    def _onUnsubscribe(self, qref):
+        OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._subscriptions))  # minus one?
 
     def setToGraph(self, newGraph: ConjunctiveGraph):
         self.patch(Patch.fromDiff(self._graph, newGraph))
-
-
-SEND_SIMPLE_GRAPH = Summary('send_simple_graph', 'calls to _writeGraphResponse')
-
-
-class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler):
-
-    def initialize(self, masterGraph: PatchableGraph):
-        self.masterGraph = masterGraph
-
-    def get(self):
-        with SEND_SIMPLE_GRAPH.time():
-            self._writeGraphResponse()
-
-    def _writeGraphResponse(self):
-        acceptHeader = self.request.headers.get(
-            'Accept',
-            # see https://github.com/fiorix/cyclone/issues/20
-            self.request.headers.get('accept', ''))
-
-        if acceptHeader == 'application/nquads':
-            self.set_header('Content-type', 'application/nquads')
-            self.masterGraph.serialize(self, format='nquads')
-        elif acceptHeader == 'application/ld+json':
-            self.set_header('Content-type', 'application/ld+json')
-            self.masterGraph.serialize(self, format='json-ld', indent=2)
-        else:
-            if acceptHeader.startswith('text/html'):
-                self._writeGraphForBrowser()
-                return
-            self.set_header('Content-type', 'application/x-trig')
-            self.masterGraph.serialize(self, format='trig')
-
-    def _writeGraphForBrowser(self):
-        # We think this is a browser, so respond with a live graph view
-        # (todo)
-        self.set_header('Content-type', 'text/html')
-
-        self.write(b'''
-        <html><body><pre>''')
-
-        ns = NamespaceManager(self.masterGraph._graph)
-        # maybe these could be on the PatchableGraph instance
-        ns.bind('ex', 'http://example.com/')
-        ns.bind('', 'http://projects.bigasterisk.com/room/')
-        ns.bind("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#")
-        ns.bind("xsd", "http://www.w3.org/2001/XMLSchema#")
-
-        for s, p, o, g in sorted(self.masterGraph._graph.quads()):
-            g = g.identifier
-            nquadLine = f'{s.n3(ns)} {p.n3(ns)} {o.n3(ns)} {g.n3(ns)} .\n'
-            self.write(html.escape(nquadLine).encode('utf8'))
-
-        self.write(b'''
-        </pre>
-        <p>
-          <a href="#">[refresh]</a>
-          <label><input type="checkbox"> Auto-refresh</label>
-        </p>
-        <script>
-
-        if (new URL(window.location).searchParams.get('autorefresh') == 'on') {
-          document.querySelector("input").checked = true;
-          setTimeout(() => {
-            requestAnimationFrame(() => {
-              window.location.replace(window.location.href);
-            });
-          }, 2000);
-        }
-
-        document.querySelector("a").addEventListener("click", (ev) => {
-          ev.preventDefault();
-          window.location.replace(window.location.href);
-
-        });
-        document.querySelector("input").addEventListener("change", (ev) => {
-          if (document.querySelector("input").checked) {
-             const u = new URL(window.location);
-             u.searchParams.set('autorefresh', 'on');
-             window.location.replace(u.href);
-          } else {
-             const u = new URL(window.location);
-             u.searchParams.delete('autorefresh');
-             window.location.replace(u.href);
-          }
-        });
-
-        </script>
-        </body></html>
-        ''')
-
-
-SEND_FULL_GRAPH = Summary('send_full_graph', 'fullGraph SSE events')
-SEND_PATCH = Summary('send_patch', 'patch SSE events')
-
-
-class CycloneGraphEventsHandler(cyclone.sse.SSEHandler):
-    """
-    One session with one client.
-
-    returns current graph plus future patches to keep remote version
-    in sync with ours.
-
-    intsead of turning off buffering all over, it may work for this
-    response to send 'x-accel-buffering: no', per
-    http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering
-    """
-
-    def __init__(self, application, request, masterGraph):
-        cyclone.sse.SSEHandler.__init__(self, application, request)
-        self.masterGraph = masterGraph
-
-    def bind(self):
-        with SEND_FULL_GRAPH.time():
-            graphJson = self.masterGraph.asJsonLd()
-            log.debug("send fullGraph event: %s", graphJson)
-            self.sendEvent(message=graphJson, event=b'fullGraph')
-            self.masterGraph.addObserver(self.onPatch)
-
-    def onPatch(self, patchJson):
-        with SEND_PATCH.time():
-            # throttle and combine patches here- ideally we could see how
-            # long the latency to the client is to make a better rate choice
-            self.sendEvent(message=patchJson, event=b'patch')
-
-    def unbind(self):
-        self.masterGraph.removeObserver(self.onPatch)
-
-    def flush(self):
-        if getattr(self.settings, 'allowOrigin'):
-            allow = self.settings.allowOrigin
-            if allow != b'*':
-                raise NotImplementedError()
-            self.set_header(b"Access-Control-Allow-Origin", allow)
-        return CycloneGraphEventsHandler.flush(self)
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/patchablegraph_handler.py	Sat Apr 23 23:58:41 2022 -0700
@@ -0,0 +1,135 @@
+import html
+import logging
+from typing import Callable, Tuple
+
+from prometheus_client import Summary
+from rdflib.namespace import NamespaceManager
+from sse_starlette import ServerSentEvent
+from sse_starlette.sse import EventSourceResponse
+from starlette.requests import Request
+from starlette.responses import Response
+
+from patchablegraph import PatchableGraph
+
+log = logging.getLogger('patchablegraph')
+
+SEND_SIMPLE_GRAPH = Summary('send_simple_graph', 'calls to _writeGraphResponse')
+SEND_FULL_GRAPH = Summary('send_full_graph', 'fullGraph SSE events')
+SEND_PATCH = Summary('send_patch', 'patch SSE events')
+
+
+def StaticGraph(masterGraph: PatchableGraph) -> Callable[[Request], Response]:
+    """
+    e.g.
+            Route('/graph/environment', StaticGraph(masterGraph)),
+    """
+
+    @SEND_SIMPLE_GRAPH.time()
+    def handle(request: Request) -> Response:
+        ctype, content = _writeGraphResponse(masterGraph, request.headers.get('accept', default=''))
+        r = Response(content=content)
+        r.media_type = ctype
+        return r
+
+    return handle
+
+
+def _writeGraphResponse(masterGraph: PatchableGraph, accept: str) -> Tuple[str, bytes]:
+    if accept == 'application/nquads':
+        return 'application/nquads', masterGraph.serialize(format='nquads')
+    elif accept == 'application/ld+json':
+        return 'application/ld+json', masterGraph.serialize(format='json-ld', indent=2)
+    else:
+        if accept.startswith('text/html'):
+            return _writeGraphForBrowser(masterGraph)
+        return 'application/x-trig', masterGraph.serialize(format='trig')
+
+
+def _writeGraphForBrowser(masterGraph: PatchableGraph) -> Tuple[str, bytes]:
+    # We think this is a browser, so respond with a live graph view
+    # (todo)
+    out = (b'''
+    <html><body><pre>''')
+
+    ns = NamespaceManager(masterGraph._graph)
+    # maybe these could be on the PatchableGraph instance
+    ns.bind('ex', 'http://example.com/')
+    ns.bind('', 'http://projects.bigasterisk.com/room/')
+    ns.bind("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#")
+    ns.bind("xsd", "http://www.w3.org/2001/XMLSchema#")
+
+    for s, p, o, g in sorted(masterGraph._graph.quads()):
+        g = g.identifier
+        nquadLine = f'{s.n3(ns)} {p.n3(ns)} {o.n3(ns)} {g.n3(ns)} .\n'
+        out += html.escape(nquadLine).encode('utf8')
+
+    out += b'''
+    </pre>
+    <p>
+        <a href="#">[refresh]</a>
+        <label><input type="checkbox"> Auto-refresh</label>
+    </p>
+    <script>
+
+    if (new URL(window.location).searchParams.get('autorefresh') == 'on') {
+        document.querySelector("input").checked = true;
+        setTimeout(() => {
+        requestAnimationFrame(() => {
+            window.location.replace(window.location.href);
+        });
+        }, 2000);
+    }
+
+    document.querySelector("a").addEventListener("click", (ev) => {
+        ev.preventDefault();
+        window.location.replace(window.location.href);
+
+    });
+    document.querySelector("input").addEventListener("change", (ev) => {
+        if (document.querySelector("input").checked) {
+            const u = new URL(window.location);
+            u.searchParams.set('autorefresh', 'on');
+            window.location.replace(u.href);
+        } else {
+            const u = new URL(window.location);
+            u.searchParams.delete('autorefresh');
+            window.location.replace(u.href);
+        }
+    });
+
+    </script>
+    </body></html>
+    '''
+    return 'text/html', out
+
+
+def GraphEvents(masterGraph: PatchableGraph):
+    """
+    e.g.
+            Route('/graph/environment/events', GraphEvents(masterGraph)),
+    """
+
+    async def generateEvents():
+        events = masterGraph.subscribeToPatches()
+        while True:  # we'll get cancelled by EventSourceResponse when the conn drops
+            etype, data = await events.get()
+            # Are there more to get? We might throttle and combine patches here- ideally we could see how
+            # long the latency to the client is to make a better rate choice
+            metric = {'graph': SEND_FULL_GRAPH, 'patch': SEND_PATCH}[etype]
+            with metric.time():
+                yield ServerSentEvent(event=etype, data=data)
+
+    async def handle(request: Request):
+        """
+        One session with one client.
+
+        returns current graph plus future patches to keep remote version
+        in sync with ours.
+
+        instead of turning off buffering all over, it may work for this
+        response to send 'x-accel-buffering: no', per
+        http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering
+        """
+        return EventSourceResponse(generateEvents())
+
+    return handle
--- a/setup.py	Sat Apr 23 23:48:55 2022 -0700
+++ b/setup.py	Sat Apr 23 23:58:41 2022 -0700
@@ -2,19 +2,17 @@
  
 setup(
     name='patchablegraph',
-    version='0.19.0',
+    version='1.0.0',
     packages=['patchablegraph'],
     package_dir={'patchablegraph': ''},
     install_requires=[
-        'cyclone',
-        'twisted',
         'rdflib >= 6.0.1',
         'rdfdb >= 0.8.0',
         'prometheus_client',
-        'cycloneerr',
-        'twisted_sse >= 0.3.0',
+        'sse_starlette>=0.10.3',
+        'starlette>=0.19.1'
     ],
-    url='https://projects.bigasterisk.com/patchablegraph/patchablegraph-0.19.0.tar.gz',
+    url='https://projects.bigasterisk.com/patchablegraph/patchablegraph-1.0.0.tar.gz',
     author='Drew Perttula',
     author_email='drewp@bigasterisk.com',
 )