Mercurial > code > home > repos > patchablegraph
comparison patchablegraph_handler.py @ 25:e11d407c46f8
rewrite for asyncio and starlette
author | drewp@bigasterisk.com |
---|---|
date | Sat, 23 Apr 2022 23:58:41 -0700 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
24:1eac25669333 | 25:e11d407c46f8 |
---|---|
1 import html | |
2 import logging | |
3 from typing import Callable, Tuple | |
4 | |
5 from prometheus_client import Summary | |
6 from rdflib.namespace import NamespaceManager | |
7 from sse_starlette import ServerSentEvent | |
8 from sse_starlette.sse import EventSourceResponse | |
9 from starlette.requests import Request | |
10 from starlette.responses import Response | |
11 | |
12 from patchablegraph import PatchableGraph | |
13 | |
14 log = logging.getLogger('patchablegraph') | |
15 | |
16 SEND_SIMPLE_GRAPH = Summary('send_simple_graph', 'calls to _writeGraphResponse') | |
17 SEND_FULL_GRAPH = Summary('send_full_graph', 'fullGraph SSE events') | |
18 SEND_PATCH = Summary('send_patch', 'patch SSE events') | |
19 | |
20 | |
21 def StaticGraph(masterGraph: PatchableGraph) -> Callable[[Request], Response]: | |
22 """ | |
23 e.g. | |
24 Route('/graph/environment', StaticGraph(masterGraph)), | |
25 """ | |
26 | |
27 @SEND_SIMPLE_GRAPH.time() | |
28 def handle(request: Request) -> Response: | |
29 ctype, content = _writeGraphResponse(masterGraph, request.headers.get('accept', default='')) | |
30 r = Response(content=content) | |
31 r.media_type = ctype | |
32 return r | |
33 | |
34 return handle | |
35 | |
36 | |
37 def _writeGraphResponse(masterGraph: PatchableGraph, accept: str) -> Tuple[str, bytes]: | |
38 if accept == 'application/nquads': | |
39 return 'application/nquads', masterGraph.serialize(format='nquads') | |
40 elif accept == 'application/ld+json': | |
41 return 'application/ld+json', masterGraph.serialize(format='json-ld', indent=2) | |
42 else: | |
43 if accept.startswith('text/html'): | |
44 return _writeGraphForBrowser(masterGraph) | |
45 return 'application/x-trig', masterGraph.serialize(format='trig') | |
46 | |
47 | |
48 def _writeGraphForBrowser(masterGraph: PatchableGraph) -> Tuple[str, bytes]: | |
49 # We think this is a browser, so respond with a live graph view | |
50 # (todo) | |
51 out = (b''' | |
52 <html><body><pre>''') | |
53 | |
54 ns = NamespaceManager(masterGraph._graph) | |
55 # maybe these could be on the PatchableGraph instance | |
56 ns.bind('ex', 'http://example.com/') | |
57 ns.bind('', 'http://projects.bigasterisk.com/room/') | |
58 ns.bind("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#") | |
59 ns.bind("xsd", "http://www.w3.org/2001/XMLSchema#") | |
60 | |
61 for s, p, o, g in sorted(masterGraph._graph.quads()): | |
62 g = g.identifier | |
63 nquadLine = f'{s.n3(ns)} {p.n3(ns)} {o.n3(ns)} {g.n3(ns)} .\n' | |
64 out += html.escape(nquadLine).encode('utf8') | |
65 | |
66 out += b''' | |
67 </pre> | |
68 <p> | |
69 <a href="#">[refresh]</a> | |
70 <label><input type="checkbox"> Auto-refresh</label> | |
71 </p> | |
72 <script> | |
73 | |
74 if (new URL(window.location).searchParams.get('autorefresh') == 'on') { | |
75 document.querySelector("input").checked = true; | |
76 setTimeout(() => { | |
77 requestAnimationFrame(() => { | |
78 window.location.replace(window.location.href); | |
79 }); | |
80 }, 2000); | |
81 } | |
82 | |
83 document.querySelector("a").addEventListener("click", (ev) => { | |
84 ev.preventDefault(); | |
85 window.location.replace(window.location.href); | |
86 | |
87 }); | |
88 document.querySelector("input").addEventListener("change", (ev) => { | |
89 if (document.querySelector("input").checked) { | |
90 const u = new URL(window.location); | |
91 u.searchParams.set('autorefresh', 'on'); | |
92 window.location.replace(u.href); | |
93 } else { | |
94 const u = new URL(window.location); | |
95 u.searchParams.delete('autorefresh'); | |
96 window.location.replace(u.href); | |
97 } | |
98 }); | |
99 | |
100 </script> | |
101 </body></html> | |
102 ''' | |
103 return 'text/html', out | |
104 | |
105 | |
106 def GraphEvents(masterGraph: PatchableGraph): | |
107 """ | |
108 e.g. | |
109 Route('/graph/environment/events', GraphEvents(masterGraph)), | |
110 """ | |
111 | |
112 async def generateEvents(): | |
113 events = masterGraph.subscribeToPatches() | |
114 while True: # we'll get cancelled by EventSourceResponse when the conn drops | |
115 etype, data = await events.get() | |
116 # Are there more to get? We might throttle and combine patches here- ideally we could see how | |
117 # long the latency to the client is to make a better rate choice | |
118 metric = {'graph': SEND_FULL_GRAPH, 'patch': SEND_PATCH}[etype] | |
119 with metric.time(): | |
120 yield ServerSentEvent(event=etype, data=data) | |
121 | |
122 async def handle(request: Request): | |
123 """ | |
124 One session with one client. | |
125 | |
126 returns current graph plus future patches to keep remote version | |
127 in sync with ours. | |
128 | |
129 instead of turning off buffering all over, it may work for this | |
130 response to send 'x-accel-buffering: no', per | |
131 http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering | |
132 """ | |
133 return EventSourceResponse(generateEvents()) | |
134 | |
135 return handle |