comparison patchablegraph_handler.py @ 25:e11d407c46f8

rewrite for asyncio and starlette
author drewp@bigasterisk.com
date Sat, 23 Apr 2022 23:58:41 -0700
parents
children
comparison
equal deleted inserted replaced
24:1eac25669333 25:e11d407c46f8
1 import html
2 import logging
3 from typing import Callable, Tuple
4
5 from prometheus_client import Summary
6 from rdflib.namespace import NamespaceManager
7 from sse_starlette import ServerSentEvent
8 from sse_starlette.sse import EventSourceResponse
9 from starlette.requests import Request
10 from starlette.responses import Response
11
12 from patchablegraph import PatchableGraph
13
14 log = logging.getLogger('patchablegraph')
15
16 SEND_SIMPLE_GRAPH = Summary('send_simple_graph', 'calls to _writeGraphResponse')
17 SEND_FULL_GRAPH = Summary('send_full_graph', 'fullGraph SSE events')
18 SEND_PATCH = Summary('send_patch', 'patch SSE events')
19
20
21 def StaticGraph(masterGraph: PatchableGraph) -> Callable[[Request], Response]:
22 """
23 e.g.
24 Route('/graph/environment', StaticGraph(masterGraph)),
25 """
26
27 @SEND_SIMPLE_GRAPH.time()
28 def handle(request: Request) -> Response:
29 ctype, content = _writeGraphResponse(masterGraph, request.headers.get('accept', default=''))
30 r = Response(content=content)
31 r.media_type = ctype
32 return r
33
34 return handle
35
36
37 def _writeGraphResponse(masterGraph: PatchableGraph, accept: str) -> Tuple[str, bytes]:
38 if accept == 'application/nquads':
39 return 'application/nquads', masterGraph.serialize(format='nquads')
40 elif accept == 'application/ld+json':
41 return 'application/ld+json', masterGraph.serialize(format='json-ld', indent=2)
42 else:
43 if accept.startswith('text/html'):
44 return _writeGraphForBrowser(masterGraph)
45 return 'application/x-trig', masterGraph.serialize(format='trig')
46
47
48 def _writeGraphForBrowser(masterGraph: PatchableGraph) -> Tuple[str, bytes]:
49 # We think this is a browser, so respond with a live graph view
50 # (todo)
51 out = (b'''
52 <html><body><pre>''')
53
54 ns = NamespaceManager(masterGraph._graph)
55 # maybe these could be on the PatchableGraph instance
56 ns.bind('ex', 'http://example.com/')
57 ns.bind('', 'http://projects.bigasterisk.com/room/')
58 ns.bind("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#")
59 ns.bind("xsd", "http://www.w3.org/2001/XMLSchema#")
60
61 for s, p, o, g in sorted(masterGraph._graph.quads()):
62 g = g.identifier
63 nquadLine = f'{s.n3(ns)} {p.n3(ns)} {o.n3(ns)} {g.n3(ns)} .\n'
64 out += html.escape(nquadLine).encode('utf8')
65
66 out += b'''
67 </pre>
68 <p>
69 <a href="#">[refresh]</a>
70 <label><input type="checkbox"> Auto-refresh</label>
71 </p>
72 <script>
73
74 if (new URL(window.location).searchParams.get('autorefresh') == 'on') {
75 document.querySelector("input").checked = true;
76 setTimeout(() => {
77 requestAnimationFrame(() => {
78 window.location.replace(window.location.href);
79 });
80 }, 2000);
81 }
82
83 document.querySelector("a").addEventListener("click", (ev) => {
84 ev.preventDefault();
85 window.location.replace(window.location.href);
86
87 });
88 document.querySelector("input").addEventListener("change", (ev) => {
89 if (document.querySelector("input").checked) {
90 const u = new URL(window.location);
91 u.searchParams.set('autorefresh', 'on');
92 window.location.replace(u.href);
93 } else {
94 const u = new URL(window.location);
95 u.searchParams.delete('autorefresh');
96 window.location.replace(u.href);
97 }
98 });
99
100 </script>
101 </body></html>
102 '''
103 return 'text/html', out
104
105
106 def GraphEvents(masterGraph: PatchableGraph):
107 """
108 e.g.
109 Route('/graph/environment/events', GraphEvents(masterGraph)),
110 """
111
112 async def generateEvents():
113 events = masterGraph.subscribeToPatches()
114 while True: # we'll get cancelled by EventSourceResponse when the conn drops
115 etype, data = await events.get()
116 # Are there more to get? We might throttle and combine patches here- ideally we could see how
117 # long the latency to the client is to make a better rate choice
118 metric = {'graph': SEND_FULL_GRAPH, 'patch': SEND_PATCH}[etype]
119 with metric.time():
120 yield ServerSentEvent(event=etype, data=data)
121
122 async def handle(request: Request):
123 """
124 One session with one client.
125
126 returns current graph plus future patches to keep remote version
127 in sync with ours.
128
129 instead of turning off buffering all over, it may work for this
130 response to send 'x-accel-buffering: no', per
131 http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering
132 """
133 return EventSourceResponse(generateEvents())
134
135 return handle