25
|
1 import html
|
|
2 import logging
|
|
3 from typing import Callable, Tuple
|
|
4
|
|
5 from prometheus_client import Summary
|
|
6 from rdflib.namespace import NamespaceManager
|
|
7 from sse_starlette import ServerSentEvent
|
|
8 from sse_starlette.sse import EventSourceResponse
|
|
9 from starlette.requests import Request
|
|
10 from starlette.responses import Response
|
|
11
|
|
12 from patchablegraph import PatchableGraph
|
|
13
|
|
14 log = logging.getLogger('patchablegraph')
|
|
15
|
|
16 SEND_SIMPLE_GRAPH = Summary('send_simple_graph', 'calls to _writeGraphResponse')
|
|
17 SEND_FULL_GRAPH = Summary('send_full_graph', 'fullGraph SSE events')
|
|
18 SEND_PATCH = Summary('send_patch', 'patch SSE events')
|
|
19
|
|
20
|
|
21 def StaticGraph(masterGraph: PatchableGraph) -> Callable[[Request], Response]:
|
|
22 """
|
|
23 e.g.
|
|
24 Route('/graph/environment', StaticGraph(masterGraph)),
|
|
25 """
|
|
26
|
|
27 @SEND_SIMPLE_GRAPH.time()
|
|
28 def handle(request: Request) -> Response:
|
|
29 ctype, content = _writeGraphResponse(masterGraph, request.headers.get('accept', default=''))
|
|
30 r = Response(content=content)
|
|
31 r.media_type = ctype
|
|
32 return r
|
|
33
|
|
34 return handle
|
|
35
|
|
36
|
|
37 def _writeGraphResponse(masterGraph: PatchableGraph, accept: str) -> Tuple[str, bytes]:
|
|
38 if accept == 'application/nquads':
|
|
39 return 'application/nquads', masterGraph.serialize(format='nquads')
|
|
40 elif accept == 'application/ld+json':
|
|
41 return 'application/ld+json', masterGraph.serialize(format='json-ld', indent=2)
|
|
42 else:
|
|
43 if accept.startswith('text/html'):
|
|
44 return _writeGraphForBrowser(masterGraph)
|
|
45 return 'application/x-trig', masterGraph.serialize(format='trig')
|
|
46
|
|
47
|
|
48 def _writeGraphForBrowser(masterGraph: PatchableGraph) -> Tuple[str, bytes]:
|
|
49 # We think this is a browser, so respond with a live graph view
|
|
50 # (todo)
|
|
51 out = (b'''
|
|
52 <html><body><pre>''')
|
|
53
|
|
54 ns = NamespaceManager(masterGraph._graph)
|
|
55 # maybe these could be on the PatchableGraph instance
|
|
56 ns.bind('ex', 'http://example.com/')
|
|
57 ns.bind('', 'http://projects.bigasterisk.com/room/')
|
|
58 ns.bind("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#")
|
|
59 ns.bind("xsd", "http://www.w3.org/2001/XMLSchema#")
|
|
60
|
|
61 for s, p, o, g in sorted(masterGraph._graph.quads()):
|
|
62 g = g.identifier
|
|
63 nquadLine = f'{s.n3(ns)} {p.n3(ns)} {o.n3(ns)} {g.n3(ns)} .\n'
|
|
64 out += html.escape(nquadLine).encode('utf8')
|
|
65
|
|
66 out += b'''
|
|
67 </pre>
|
|
68 <p>
|
|
69 <a href="#">[refresh]</a>
|
|
70 <label><input type="checkbox"> Auto-refresh</label>
|
|
71 </p>
|
|
72 <script>
|
|
73
|
|
74 if (new URL(window.location).searchParams.get('autorefresh') == 'on') {
|
|
75 document.querySelector("input").checked = true;
|
|
76 setTimeout(() => {
|
|
77 requestAnimationFrame(() => {
|
|
78 window.location.replace(window.location.href);
|
|
79 });
|
|
80 }, 2000);
|
|
81 }
|
|
82
|
|
83 document.querySelector("a").addEventListener("click", (ev) => {
|
|
84 ev.preventDefault();
|
|
85 window.location.replace(window.location.href);
|
|
86
|
|
87 });
|
|
88 document.querySelector("input").addEventListener("change", (ev) => {
|
|
89 if (document.querySelector("input").checked) {
|
|
90 const u = new URL(window.location);
|
|
91 u.searchParams.set('autorefresh', 'on');
|
|
92 window.location.replace(u.href);
|
|
93 } else {
|
|
94 const u = new URL(window.location);
|
|
95 u.searchParams.delete('autorefresh');
|
|
96 window.location.replace(u.href);
|
|
97 }
|
|
98 });
|
|
99
|
|
100 </script>
|
|
101 </body></html>
|
|
102 '''
|
|
103 return 'text/html', out
|
|
104
|
|
105
|
|
106 def GraphEvents(masterGraph: PatchableGraph):
|
|
107 """
|
|
108 e.g.
|
|
109 Route('/graph/environment/events', GraphEvents(masterGraph)),
|
|
110 """
|
|
111
|
|
112 async def generateEvents():
|
|
113 events = masterGraph.subscribeToPatches()
|
|
114 while True: # we'll get cancelled by EventSourceResponse when the conn drops
|
|
115 etype, data = await events.get()
|
|
116 # Are there more to get? We might throttle and combine patches here- ideally we could see how
|
|
117 # long the latency to the client is to make a better rate choice
|
|
118 metric = {'graph': SEND_FULL_GRAPH, 'patch': SEND_PATCH}[etype]
|
|
119 with metric.time():
|
|
120 yield ServerSentEvent(event=etype, data=data)
|
|
121
|
|
122 async def handle(request: Request):
|
|
123 """
|
|
124 One session with one client.
|
|
125
|
|
126 returns current graph plus future patches to keep remote version
|
|
127 in sync with ours.
|
|
128
|
|
129 instead of turning off buffering all over, it may work for this
|
|
130 response to send 'x-accel-buffering: no', per
|
|
131 http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering
|
|
132 """
|
|
133 return EventSourceResponse(generateEvents())
|
|
134
|
|
135 return handle
|