comparison patchablegraph.py @ 25:e11d407c46f8

rewrite for asyncio and starlette
author drewp@bigasterisk.com
date Sat, 23 Apr 2022 23:58:41 -0700
parents 8d6ba6d372c8
children
comparison
equal deleted inserted replaced
24:1eac25669333 25:e11d407c46f8
18 * https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF 18 * https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF
19 * https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of 19 * https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of
20 differences between RDF graphs 20 differences between RDF graphs
21 21
22 """ 22 """
23 import html 23 import asyncio
24 import itertools 24 import itertools
25 import json 25 import json
26 import logging 26 import logging
27 from typing import Callable, List, Optional, cast 27 import weakref
28 from typing import Callable, List, NewType, Optional, cast, Set
28 29
29 import cyclone.sse
30 import cyclone.web
31 from cycloneerr import PrettyErrorHandler
32 from prometheus_client import Counter, Gauge, Summary 30 from prometheus_client import Counter, Gauge, Summary
33 from rdfdb.grapheditapi import GraphEditApi 31 from rdfdb.grapheditapi import GraphEditApi
34 from rdfdb.patch import Patch 32 from rdfdb.patch import Patch
35 from rdfdb.rdflibpatch import inGraph, patchQuads 33 from rdfdb.rdflibpatch import inGraph, patchQuads
36 from rdflib import ConjunctiveGraph 34 from rdflib import ConjunctiveGraph
37 from rdflib.namespace import NamespaceManager
38 from rdflib.parser import StringInputSource 35 from rdflib.parser import StringInputSource
39 from rdflib.plugins.serializers.jsonld import from_rdf 36 from rdflib.plugins.serializers.jsonld import from_rdf
40 37
41 from . import patch_cyclone 38 JsonSerializedPatch = NewType('JsonSerializedPatch', str)
39 JsonLdSerializedGraph = NewType('JsonLdSerializedGraph', str)
42 40
43 patch_cyclone.patch_sse()
44 log = logging.getLogger('patchablegraph') 41 log = logging.getLogger('patchablegraph')
45 42
46 SERIALIZE_CALLS = Summary('serialize_calls', 'PatchableGraph.serialize calls', labelnames=['graph']) 43 SERIALIZE_CALLS = Summary('serialize_calls', 'PatchableGraph.serialize calls', labelnames=['graph'])
47 PATCH_CALLS = Summary('patch_calls', 'PatchableGraph.patch calls', labelnames=['graph']) 44 PATCH_CALLS = Summary('patch_calls', 'PatchableGraph.patch calls', labelnames=['graph'])
48 STATEMENT_COUNT = Gauge('statement_count', 'current PatchableGraph graph size', labelnames=['graph']) 45 STATEMENT_COUNT = Gauge('statement_count', 'current PatchableGraph graph size', labelnames=['graph'])
51 48
52 49
53 # forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py 50 # forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py
54 def _graphFromQuads2(q): 51 def _graphFromQuads2(q):
55 g = ConjunctiveGraph() 52 g = ConjunctiveGraph()
56 #g.addN(q) # no effect on nquad output 53 # g.addN(q) # no effect on nquad output
57 for s, p, o, c in q: 54 for s, p, o, c in q:
58 g.get_context(c).add((s, p, o)) # kind of works with broken rdflib nquad serializer code 55 g.get_context(c).add((s, p, o)) # kind of works with broken rdflib nquad serializer code
59 #g.store.add((s,p,o), c) # no effect on nquad output 56 # g.store.add((s,p,o), c) # no effect on nquad output
60 return g 57 return g
61 58
62 59
63 def jsonFromPatch(p: Patch) -> str: 60 def jsonFromPatch(p: Patch) -> JsonSerializedPatch:
64 return json.dumps( 61 return cast(JsonSerializedPatch, json.dumps({'patch': {
65 {'patch': { 62 'adds': from_rdf(_graphFromQuads2(p.addQuads)),
66 'adds': from_rdf(_graphFromQuads2(p.addQuads)), 63 'deletes': from_rdf(_graphFromQuads2(p.delQuads)),
67 'deletes': from_rdf(_graphFromQuads2(p.delQuads)), 64 }}))
68 }})
69 65
70 66
71 patchAsJson = jsonFromPatch # deprecated name 67 patchAsJson = jsonFromPatch # deprecated name
72 68
73 69
74 def patchFromJson(j: str) -> Patch: 70 def patchFromJson(j: JsonSerializedPatch) -> Patch:
75 body = json.loads(j)['patch'] 71 body = json.loads(j)['patch']
76 a = ConjunctiveGraph() 72 a = ConjunctiveGraph()
77 a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld') 73 a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld')
78 d = ConjunctiveGraph() 74 d = ConjunctiveGraph()
79 d.parse(StringInputSource(json.dumps(body['deletes']).encode('utf8')), format='json-ld') 75 d.parse(StringInputSource(json.dumps(body['deletes']).encode('utf8')), format='json-ld')
80 return Patch(addGraph=a, delGraph=d) 76 return Patch(addGraph=a, delGraph=d)
81 77
82 78
83 def graphAsJson(g: ConjunctiveGraph) -> str: 79 def graphAsJson(g: ConjunctiveGraph) -> JsonLdSerializedGraph:
84 # This is not the same as g.serialize(format='json-ld')! That 80 # This is not the same as g.serialize(format='json-ld')! That
85 # version omits literal datatypes. 81 # version omits literal datatypes.
86 return json.dumps(from_rdf(g)) 82 return cast(JsonLdSerializedGraph, json.dumps(from_rdf(g)))
87 83
88 84
89 _graphsInProcess = itertools.count() 85 _graphsInProcess = itertools.count()
90 86
91 87
95 updates to all current listeners. 91 updates to all current listeners.
96 """ 92 """
97 93
98 def __init__(self, label: Optional[str] = None): 94 def __init__(self, label: Optional[str] = None):
99 self._graph = ConjunctiveGraph() 95 self._graph = ConjunctiveGraph()
100 self._observers: List[Callable[[str], None]] = [] 96 self._subscriptions: weakref.WeakSet[asyncio.Queue] = weakref.WeakSet()
97
101 if label is None: 98 if label is None:
102 label = f'patchableGraph{next(_graphsInProcess)}' 99 label = f'patchableGraph{next(_graphsInProcess)}'
103 self.label = label 100 self.label = label
101 log.info('making %r', label)
104 102
105 def serialize(self, *arg, **kw) -> bytes: 103 def serialize(self, *arg, **kw) -> bytes:
106 with SERIALIZE_CALLS.labels(graph=self.label).time(): 104 with SERIALIZE_CALLS.labels(graph=self.label).time():
107 return cast(bytes, self._graph.serialize(*arg, **kw)) 105 return cast(bytes, self._graph.serialize(*arg, **kw))
108 106
113 adds = set([q for q in p.addQuads if not inGraph(q, self._graph)]) 111 adds = set([q for q in p.addQuads if not inGraph(q, self._graph)])
114 minimizedP = Patch(addQuads=adds, delQuads=dels) 112 minimizedP = Patch(addQuads=adds, delQuads=dels)
115 if minimizedP.isNoop(): 113 if minimizedP.isNoop():
116 return 114 return
117 patchQuads(self._graph, deleteQuads=dels, addQuads=adds, perfect=False) # true? 115 patchQuads(self._graph, deleteQuads=dels, addQuads=adds, perfect=False) # true?
118 for ob in self._observers: 116 if self._subscriptions:
119 ob(patchAsJson(p)) 117 log.info('PatchableGraph: patched; telling %s observers', len(self._subscriptions))
118 j = patchAsJson(p)
119 for q in self._subscriptions:
120 q.put_nowait(('patch', j))
120 STATEMENT_COUNT.labels(graph=self.label).set(len(self._graph)) 121 STATEMENT_COUNT.labels(graph=self.label).set(len(self._graph))
121 122
122 def asJsonLd(self) -> str: 123 def asJsonLd(self) -> JsonLdSerializedGraph:
123 return graphAsJson(self._graph) 124 return graphAsJson(self._graph)
124 125
125 def addObserver(self, onPatch: Callable[[str], None]): 126 def subscribeToPatches(self) -> asyncio.Queue:
126 self._observers.append(onPatch) 127 q = asyncio.Queue()
127 OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._observers)) 128 qref = weakref.ref(q, self._onUnsubscribe)
129 self._initialSubscribeEvents(qref)
130 return q
131
132 def _initialSubscribeEvents(self, qref):
133 q = qref()
134 log.info('new sub queue %s', q)
135 self._subscriptions.add(q) # when caller forgets about queue, we will too
136 OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._subscriptions))
128 OBSERVERS_ADDED.labels(graph=self.label).inc() 137 OBSERVERS_ADDED.labels(graph=self.label).inc()
138 q.put_nowait(('graph', self.asJsonLd())) # this should be chunked, or just done as reset + patches
129 139
130 def removeObserver(self, onPatch: Callable[[str], None]): 140 def _onUnsubscribe(self, qref):
131 try: 141 OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._subscriptions)) # minus one?
132 self._observers.remove(onPatch)
133 except ValueError:
134 pass
135 self._currentObservers = len(self._observers)
136 142
137 def setToGraph(self, newGraph: ConjunctiveGraph): 143 def setToGraph(self, newGraph: ConjunctiveGraph):
138 self.patch(Patch.fromDiff(self._graph, newGraph)) 144 self.patch(Patch.fromDiff(self._graph, newGraph))
139
140
141 SEND_SIMPLE_GRAPH = Summary('send_simple_graph', 'calls to _writeGraphResponse')
142
143
144 class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler):
145
146 def initialize(self, masterGraph: PatchableGraph):
147 self.masterGraph = masterGraph
148
149 def get(self):
150 with SEND_SIMPLE_GRAPH.time():
151 self._writeGraphResponse()
152
153 def _writeGraphResponse(self):
154 acceptHeader = self.request.headers.get(
155 'Accept',
156 # see https://github.com/fiorix/cyclone/issues/20
157 self.request.headers.get('accept', ''))
158
159 if acceptHeader == 'application/nquads':
160 self.set_header('Content-type', 'application/nquads')
161 self.masterGraph.serialize(self, format='nquads')
162 elif acceptHeader == 'application/ld+json':
163 self.set_header('Content-type', 'application/ld+json')
164 self.masterGraph.serialize(self, format='json-ld', indent=2)
165 else:
166 if acceptHeader.startswith('text/html'):
167 self._writeGraphForBrowser()
168 return
169 self.set_header('Content-type', 'application/x-trig')
170 self.masterGraph.serialize(self, format='trig')
171
172 def _writeGraphForBrowser(self):
173 # We think this is a browser, so respond with a live graph view
174 # (todo)
175 self.set_header('Content-type', 'text/html')
176
177 self.write(b'''
178 <html><body><pre>''')
179
180 ns = NamespaceManager(self.masterGraph._graph)
181 # maybe these could be on the PatchableGraph instance
182 ns.bind('ex', 'http://example.com/')
183 ns.bind('', 'http://projects.bigasterisk.com/room/')
184 ns.bind("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#")
185 ns.bind("xsd", "http://www.w3.org/2001/XMLSchema#")
186
187 for s, p, o, g in sorted(self.masterGraph._graph.quads()):
188 g = g.identifier
189 nquadLine = f'{s.n3(ns)} {p.n3(ns)} {o.n3(ns)} {g.n3(ns)} .\n'
190 self.write(html.escape(nquadLine).encode('utf8'))
191
192 self.write(b'''
193 </pre>
194 <p>
195 <a href="#">[refresh]</a>
196 <label><input type="checkbox"> Auto-refresh</label>
197 </p>
198 <script>
199
200 if (new URL(window.location).searchParams.get('autorefresh') == 'on') {
201 document.querySelector("input").checked = true;
202 setTimeout(() => {
203 requestAnimationFrame(() => {
204 window.location.replace(window.location.href);
205 });
206 }, 2000);
207 }
208
209 document.querySelector("a").addEventListener("click", (ev) => {
210 ev.preventDefault();
211 window.location.replace(window.location.href);
212
213 });
214 document.querySelector("input").addEventListener("change", (ev) => {
215 if (document.querySelector("input").checked) {
216 const u = new URL(window.location);
217 u.searchParams.set('autorefresh', 'on');
218 window.location.replace(u.href);
219 } else {
220 const u = new URL(window.location);
221 u.searchParams.delete('autorefresh');
222 window.location.replace(u.href);
223 }
224 });
225
226 </script>
227 </body></html>
228 ''')
229
230
231 SEND_FULL_GRAPH = Summary('send_full_graph', 'fullGraph SSE events')
232 SEND_PATCH = Summary('send_patch', 'patch SSE events')
233
234
235 class CycloneGraphEventsHandler(cyclone.sse.SSEHandler):
236 """
237 One session with one client.
238
239 returns current graph plus future patches to keep remote version
240 in sync with ours.
241
242 intsead of turning off buffering all over, it may work for this
243 response to send 'x-accel-buffering: no', per
244 http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering
245 """
246
247 def __init__(self, application, request, masterGraph):
248 cyclone.sse.SSEHandler.__init__(self, application, request)
249 self.masterGraph = masterGraph
250
251 def bind(self):
252 with SEND_FULL_GRAPH.time():
253 graphJson = self.masterGraph.asJsonLd()
254 log.debug("send fullGraph event: %s", graphJson)
255 self.sendEvent(message=graphJson, event=b'fullGraph')
256 self.masterGraph.addObserver(self.onPatch)
257
258 def onPatch(self, patchJson):
259 with SEND_PATCH.time():
260 # throttle and combine patches here- ideally we could see how
261 # long the latency to the client is to make a better rate choice
262 self.sendEvent(message=patchJson, event=b'patch')
263
264 def unbind(self):
265 self.masterGraph.removeObserver(self.onPatch)
266
267 def flush(self):
268 if getattr(self.settings, 'allowOrigin'):
269 allow = self.settings.allowOrigin
270 if allow != b'*':
271 raise NotImplementedError()
272 self.set_header(b"Access-Control-Allow-Origin", allow)
273 return CycloneGraphEventsHandler.flush(self)