view patchablegraph_handler.py @ 25:e11d407c46f8

rewrite for asyncio and starlette
author drewp@bigasterisk.com
date Sat, 23 Apr 2022 23:58:41 -0700
parents
children
line wrap: on
line source

import html
import logging
from typing import Callable, Tuple

from prometheus_client import Summary
from rdflib.namespace import NamespaceManager
from sse_starlette import ServerSentEvent
from sse_starlette.sse import EventSourceResponse
from starlette.requests import Request
from starlette.responses import Response

from patchablegraph import PatchableGraph

log = logging.getLogger('patchablegraph')

SEND_SIMPLE_GRAPH = Summary('send_simple_graph', 'calls to _writeGraphResponse')
SEND_FULL_GRAPH = Summary('send_full_graph', 'fullGraph SSE events')
SEND_PATCH = Summary('send_patch', 'patch SSE events')


def StaticGraph(masterGraph: PatchableGraph) -> Callable[[Request], Response]:
    """
    e.g.
            Route('/graph/environment', StaticGraph(masterGraph)),
    """

    @SEND_SIMPLE_GRAPH.time()
    def handle(request: Request) -> Response:
        ctype, content = _writeGraphResponse(masterGraph, request.headers.get('accept', default=''))
        r = Response(content=content)
        r.media_type = ctype
        return r

    return handle


def _writeGraphResponse(masterGraph: PatchableGraph, accept: str) -> Tuple[str, bytes]:
    if accept == 'application/nquads':
        return 'application/nquads', masterGraph.serialize(format='nquads')
    elif accept == 'application/ld+json':
        return 'application/ld+json', masterGraph.serialize(format='json-ld', indent=2)
    else:
        if accept.startswith('text/html'):
            return _writeGraphForBrowser(masterGraph)
        return 'application/x-trig', masterGraph.serialize(format='trig')


def _writeGraphForBrowser(masterGraph: PatchableGraph) -> Tuple[str, bytes]:
    # We think this is a browser, so respond with a live graph view
    # (todo)
    out = (b'''
    <html><body><pre>''')

    ns = NamespaceManager(masterGraph._graph)
    # maybe these could be on the PatchableGraph instance
    ns.bind('ex', 'http://example.com/')
    ns.bind('', 'http://projects.bigasterisk.com/room/')
    ns.bind("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#")
    ns.bind("xsd", "http://www.w3.org/2001/XMLSchema#")

    for s, p, o, g in sorted(masterGraph._graph.quads()):
        g = g.identifier
        nquadLine = f'{s.n3(ns)} {p.n3(ns)} {o.n3(ns)} {g.n3(ns)} .\n'
        out += html.escape(nquadLine).encode('utf8')

    out += b'''
    </pre>
    <p>
        <a href="#">[refresh]</a>
        <label><input type="checkbox"> Auto-refresh</label>
    </p>
    <script>

    if (new URL(window.location).searchParams.get('autorefresh') == 'on') {
        document.querySelector("input").checked = true;
        setTimeout(() => {
        requestAnimationFrame(() => {
            window.location.replace(window.location.href);
        });
        }, 2000);
    }

    document.querySelector("a").addEventListener("click", (ev) => {
        ev.preventDefault();
        window.location.replace(window.location.href);

    });
    document.querySelector("input").addEventListener("change", (ev) => {
        if (document.querySelector("input").checked) {
            const u = new URL(window.location);
            u.searchParams.set('autorefresh', 'on');
            window.location.replace(u.href);
        } else {
            const u = new URL(window.location);
            u.searchParams.delete('autorefresh');
            window.location.replace(u.href);
        }
    });

    </script>
    </body></html>
    '''
    return 'text/html', out


def GraphEvents(masterGraph: PatchableGraph):
    """
    e.g.
            Route('/graph/environment/events', GraphEvents(masterGraph)),
    """

    async def generateEvents():
        events = masterGraph.subscribeToPatches()
        while True:  # we'll get cancelled by EventSourceResponse when the conn drops
            etype, data = await events.get()
            # Are there more to get? We might throttle and combine patches here- ideally we could see how
            # long the latency to the client is to make a better rate choice
            metric = {'graph': SEND_FULL_GRAPH, 'patch': SEND_PATCH}[etype]
            with metric.time():
                yield ServerSentEvent(event=etype, data=data)

    async def handle(request: Request):
        """
        One session with one client.

        returns current graph plus future patches to keep remote version
        in sync with ours.

        instead of turning off buffering all over, it may work for this
        response to send 'x-accel-buffering: no', per
        http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering
        """
        return EventSourceResponse(generateEvents())

    return handle