Mercurial > code > home > repos > patchablegraph
view patchablegraph_handler.py @ 25:e11d407c46f8
rewrite for asyncio and starlette
author | drewp@bigasterisk.com |
---|---|
date | Sat, 23 Apr 2022 23:58:41 -0700 |
parents | |
children |
line wrap: on
line source
import html import logging from typing import Callable, Tuple from prometheus_client import Summary from rdflib.namespace import NamespaceManager from sse_starlette import ServerSentEvent from sse_starlette.sse import EventSourceResponse from starlette.requests import Request from starlette.responses import Response from patchablegraph import PatchableGraph log = logging.getLogger('patchablegraph') SEND_SIMPLE_GRAPH = Summary('send_simple_graph', 'calls to _writeGraphResponse') SEND_FULL_GRAPH = Summary('send_full_graph', 'fullGraph SSE events') SEND_PATCH = Summary('send_patch', 'patch SSE events') def StaticGraph(masterGraph: PatchableGraph) -> Callable[[Request], Response]: """ e.g. Route('/graph/environment', StaticGraph(masterGraph)), """ @SEND_SIMPLE_GRAPH.time() def handle(request: Request) -> Response: ctype, content = _writeGraphResponse(masterGraph, request.headers.get('accept', default='')) r = Response(content=content) r.media_type = ctype return r return handle def _writeGraphResponse(masterGraph: PatchableGraph, accept: str) -> Tuple[str, bytes]: if accept == 'application/nquads': return 'application/nquads', masterGraph.serialize(format='nquads') elif accept == 'application/ld+json': return 'application/ld+json', masterGraph.serialize(format='json-ld', indent=2) else: if accept.startswith('text/html'): return _writeGraphForBrowser(masterGraph) return 'application/x-trig', masterGraph.serialize(format='trig') def _writeGraphForBrowser(masterGraph: PatchableGraph) -> Tuple[str, bytes]: # We think this is a browser, so respond with a live graph view # (todo) out = (b''' <html><body><pre>''') ns = NamespaceManager(masterGraph._graph) # maybe these could be on the PatchableGraph instance ns.bind('ex', 'http://example.com/') ns.bind('', 'http://projects.bigasterisk.com/room/') ns.bind("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#") ns.bind("xsd", "http://www.w3.org/2001/XMLSchema#") for s, p, o, g in sorted(masterGraph._graph.quads()): g = g.identifier nquadLine = f'{s.n3(ns)} {p.n3(ns)} {o.n3(ns)} {g.n3(ns)} .\n' out += html.escape(nquadLine).encode('utf8') out += b''' </pre> <p> <a href="#">[refresh]</a> <label><input type="checkbox"> Auto-refresh</label> </p> <script> if (new URL(window.location).searchParams.get('autorefresh') == 'on') { document.querySelector("input").checked = true; setTimeout(() => { requestAnimationFrame(() => { window.location.replace(window.location.href); }); }, 2000); } document.querySelector("a").addEventListener("click", (ev) => { ev.preventDefault(); window.location.replace(window.location.href); }); document.querySelector("input").addEventListener("change", (ev) => { if (document.querySelector("input").checked) { const u = new URL(window.location); u.searchParams.set('autorefresh', 'on'); window.location.replace(u.href); } else { const u = new URL(window.location); u.searchParams.delete('autorefresh'); window.location.replace(u.href); } }); </script> </body></html> ''' return 'text/html', out def GraphEvents(masterGraph: PatchableGraph): """ e.g. Route('/graph/environment/events', GraphEvents(masterGraph)), """ async def generateEvents(): events = masterGraph.subscribeToPatches() while True: # we'll get cancelled by EventSourceResponse when the conn drops etype, data = await events.get() # Are there more to get? We might throttle and combine patches here- ideally we could see how # long the latency to the client is to make a better rate choice metric = {'graph': SEND_FULL_GRAPH, 'patch': SEND_PATCH}[etype] with metric.time(): yield ServerSentEvent(event=etype, data=data) async def handle(request: Request): """ One session with one client. returns current graph plus future patches to keep remote version in sync with ours. instead of turning off buffering all over, it may work for this response to send 'x-accel-buffering: no', per http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering """ return EventSourceResponse(generateEvents()) return handle