0
|
1 """
|
|
2 Design:
|
|
3
|
|
4 1. Services each have (named) graphs, which they patch as things
|
|
5 change. PatchableGraph is an object for holding this graph.
|
|
6 2. You can http GET that graph, or ...
|
|
7 3. You can http GET/SSE that graph and hear about modifications to it
|
|
8 4. The client that got the graph holds and maintains a copy. The
|
|
9 client may merge together multiple graphs.
|
|
10 5. Client queries its graph with low-level APIs or client-side sparql.
|
|
11 6. When the graph changes, the client knows and can update itself at
|
|
12 low or high granularity.
|
|
13
|
|
14
|
|
15 See also:
|
|
16 * http://iswc2007.semanticweb.org/papers/533.pdf RDFSync: efficient remote synchronization of RDF
|
|
17 models
|
|
18 * https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF
|
|
19 * https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of
|
|
20 differences between RDF graphs
|
|
21
|
|
22 """
|
4
|
23 import html
|
|
24 import itertools
|
|
25 import json
|
|
26 import logging
|
|
27 from typing import Callable, List, Optional, cast
|
0
|
28
|
4
|
29 import cyclone.sse
|
|
30 import cyclone.web
|
|
31 from cycloneerr import PrettyErrorHandler
|
3
|
32 from prometheus_client import Counter, Gauge, Summary
|
0
|
33 from rdfdb.grapheditapi import GraphEditApi
|
4
|
34 from rdfdb.patch import Patch
|
|
35 from rdfdb.rdflibpatch import inGraph, patchQuads
|
0
|
36 from rdflib import ConjunctiveGraph
|
|
37 from rdflib.namespace import NamespaceManager
|
|
38 from rdflib.parser import StringInputSource
|
|
39 from rdflib.plugins.serializers.jsonld import from_rdf
|
|
40
|
|
41 log = logging.getLogger('patchablegraph')
|
|
42
|
3
|
43 SERIALIZE_CALLS = Summary('serialize_calls', 'PatchableGraph.serialize calls')
|
|
44 PATCH_CALLS = Summary('patch_calls', 'PatchableGraph.patch calls')
|
|
45 STATEMENT_COUNT = Gauge('statement_count', 'current PatchableGraph graph size')
|
|
46 OBSERVERS_CURRENT = Gauge('observers_current', 'current observer count')
|
|
47 OBSERVERS_ADDED = Counter('observers_added', 'observers added')
|
|
48
|
|
49
|
0
|
50 # forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py
|
|
51 def _graphFromQuads2(q):
|
|
52 g = ConjunctiveGraph()
|
|
53 #g.addN(q) # no effect on nquad output
|
4
|
54 for s, p, o, c in q:
|
|
55 g.get_context(c).add((s, p, o)) # kind of works with broken rdflib nquad serializer code
|
0
|
56 #g.store.add((s,p,o), c) # no effect on nquad output
|
|
57 return g
|
|
58
|
4
|
59
|
|
60 def jsonFromPatch(p: Patch) -> str:
|
|
61 return json.dumps(
|
|
62 {'patch': {
|
|
63 'adds': from_rdf(_graphFromQuads2(p.addQuads)),
|
|
64 'deletes': from_rdf(_graphFromQuads2(p.delQuads)),
|
|
65 }})
|
0
|
66
|
|
67
|
4
|
68 patchAsJson = jsonFromPatch # deprecated name
|
|
69
|
|
70
|
|
71 def patchFromJson(j: str) -> Patch:
|
0
|
72 body = json.loads(j)['patch']
|
|
73 a = ConjunctiveGraph()
|
|
74 a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld')
|
|
75 d = ConjunctiveGraph()
|
|
76 d.parse(StringInputSource(json.dumps(body['deletes']).encode('utf8')), format='json-ld')
|
|
77 return Patch(addGraph=a, delGraph=d)
|
|
78
|
4
|
79
|
|
80 def graphAsJson(g: ConjunctiveGraph) -> str:
|
0
|
81 # This is not the same as g.serialize(format='json-ld')! That
|
|
82 # version omits literal datatypes.
|
|
83 return json.dumps(from_rdf(g))
|
|
84
|
4
|
85
|
0
|
86 _graphsInProcess = itertools.count()
|
4
|
87
|
|
88
|
0
|
89 class PatchableGraph(GraphEditApi):
|
|
90 """
|
|
91 Master graph that you modify with self.patch, and we get the
|
|
92 updates to all current listeners.
|
|
93 """
|
|
94
|
4
|
95 def __init__(self, label: Optional[str] = None):
|
|
96 self._graph = ConjunctiveGraph()
|
|
97 self._observers: List[Callable[[str], None]] = []
|
|
98 if label is None:
|
|
99 label = f'patchableGraph{next(_graphsInProcess)}'
|
|
100 self.label = label
|
0
|
101
|
4
|
102 def serialize(self, *arg, **kw) -> bytes:
|
|
103 with SERIALIZE_CALLS.labels(graph=self.label).time():
|
|
104 return cast(bytes, self._graph.serialize(*arg, **kw))
|
|
105
|
|
106 def patch(self, p: Patch):
|
3
|
107 with PATCH_CALLS.labels(graph=self.label).time():
|
0
|
108 # assuming no stmt is both in p.addQuads and p.delQuads.
|
|
109 dels = set([q for q in p.delQuads if inGraph(q, self._graph)])
|
|
110 adds = set([q for q in p.addQuads if not inGraph(q, self._graph)])
|
|
111 minimizedP = Patch(addQuads=adds, delQuads=dels)
|
|
112 if minimizedP.isNoop():
|
|
113 return
|
4
|
114 patchQuads(self._graph, deleteQuads=dels, addQuads=adds, perfect=False) # true?
|
0
|
115 for ob in self._observers:
|
|
116 ob(patchAsJson(p))
|
3
|
117 STATEMENT_COUNT.labels(graph=self.label).set(len(self._graph))
|
0
|
118
|
4
|
119 def asJsonLd(self) -> str:
|
0
|
120 return graphAsJson(self._graph)
|
|
121
|
4
|
122 def addObserver(self, onPatch: Callable[[str], None]):
|
0
|
123 self._observers.append(onPatch)
|
3
|
124 OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._observers))
|
|
125 OBSERVERS_ADDED.labels(graph=self.label).inc()
|
0
|
126
|
4
|
127 def removeObserver(self, onPatch: Callable[[str], None]):
|
0
|
128 try:
|
|
129 self._observers.remove(onPatch)
|
|
130 except ValueError:
|
|
131 pass
|
|
132 self._currentObservers = len(self._observers)
|
|
133
|
4
|
134 def setToGraph(self, newGraph: ConjunctiveGraph):
|
0
|
135 self.patch(Patch.fromDiff(self._graph, newGraph))
|
|
136
|
3
|
137
|
|
138 SEND_SIMPLE_GRAPH = Summary('send_simple_graph', 'calls to _writeGraphResponse')
|
0
|
139
|
|
140
|
|
141 class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler):
|
4
|
142
|
0
|
143 def initialize(self, masterGraph: PatchableGraph):
|
|
144 self.masterGraph = masterGraph
|
|
145
|
|
146 def get(self):
|
3
|
147 with SEND_SIMPLE_GRAPH.time():
|
0
|
148 self._writeGraphResponse()
|
|
149
|
|
150 def _writeGraphResponse(self):
|
|
151 acceptHeader = self.request.headers.get(
|
|
152 'Accept',
|
|
153 # see https://github.com/fiorix/cyclone/issues/20
|
|
154 self.request.headers.get('accept', ''))
|
|
155
|
|
156 if acceptHeader == 'application/nquads':
|
|
157 self.set_header('Content-type', 'application/nquads')
|
|
158 self.masterGraph.serialize(self, format='nquads')
|
|
159 elif acceptHeader == 'application/ld+json':
|
|
160 self.set_header('Content-type', 'application/ld+json')
|
|
161 self.masterGraph.serialize(self, format='json-ld', indent=2)
|
|
162 else:
|
|
163 if acceptHeader.startswith('text/html'):
|
|
164 self._writeGraphForBrowser()
|
|
165 return
|
|
166 self.set_header('Content-type', 'application/x-trig')
|
|
167 self.masterGraph.serialize(self, format='trig')
|
|
168
|
|
169 def _writeGraphForBrowser(self):
|
|
170 # We think this is a browser, so respond with a live graph view
|
|
171 # (todo)
|
|
172 self.set_header('Content-type', 'text/html')
|
|
173
|
|
174 self.write(b'''
|
|
175 <html><body><pre>''')
|
|
176
|
|
177 ns = NamespaceManager(self.masterGraph._graph)
|
|
178 # maybe these could be on the PatchableGraph instance
|
|
179 ns.bind('ex', 'http://example.com/')
|
|
180 ns.bind('', 'http://projects.bigasterisk.com/room/')
|
|
181 ns.bind("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#")
|
|
182 ns.bind("xsd", "http://www.w3.org/2001/XMLSchema#")
|
|
183
|
|
184 for s, p, o, g in sorted(self.masterGraph._graph.quads()):
|
|
185 g = g.identifier
|
|
186 nquadLine = f'{s.n3(ns)} {p.n3(ns)} {o.n3(ns)} {g.n3(ns)} .\n'
|
|
187 self.write(html.escape(nquadLine).encode('utf8'))
|
|
188
|
|
189 self.write(b'''
|
|
190 </pre>
|
|
191 <p>
|
|
192 <a href="#">[refresh]</a>
|
|
193 <label><input type="checkbox"> Auto-refresh</label>
|
|
194 </p>
|
|
195 <script>
|
|
196
|
|
197 if (new URL(window.location).searchParams.get('autorefresh') == 'on') {
|
|
198 document.querySelector("input").checked = true;
|
|
199 setTimeout(() => {
|
|
200 requestAnimationFrame(() => {
|
|
201 window.location.replace(window.location.href);
|
|
202 });
|
|
203 }, 2000);
|
|
204 }
|
|
205
|
|
206 document.querySelector("a").addEventListener("click", (ev) => {
|
|
207 ev.preventDefault();
|
|
208 window.location.replace(window.location.href);
|
|
209
|
|
210 });
|
|
211 document.querySelector("input").addEventListener("change", (ev) => {
|
|
212 if (document.querySelector("input").checked) {
|
|
213 const u = new URL(window.location);
|
|
214 u.searchParams.set('autorefresh', 'on');
|
|
215 window.location.replace(u.href);
|
|
216 } else {
|
|
217 const u = new URL(window.location);
|
|
218 u.searchParams.delete('autorefresh');
|
|
219 window.location.replace(u.href);
|
|
220 }
|
|
221 });
|
|
222
|
|
223 </script>
|
|
224 </body></html>
|
|
225 ''')
|
|
226
|
|
227
|
3
|
228 SEND_FULL_GRAPH = Summary('send_full_graph', 'fullGraph SSE events')
|
|
229 SEND_PATCH = Summary('send_patch', 'patch SSE events')
|
|
230
|
|
231
|
0
|
232 class CycloneGraphEventsHandler(cyclone.sse.SSEHandler):
|
|
233 """
|
|
234 One session with one client.
|
|
235
|
|
236 returns current graph plus future patches to keep remote version
|
|
237 in sync with ours.
|
|
238
|
|
239 intsead of turning off buffering all over, it may work for this
|
|
240 response to send 'x-accel-buffering: no', per
|
|
241 http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering
|
|
242 """
|
4
|
243
|
0
|
244 def __init__(self, application, request, masterGraph):
|
|
245 cyclone.sse.SSEHandler.__init__(self, application, request)
|
|
246 self.masterGraph = masterGraph
|
|
247
|
|
248 def bind(self):
|
3
|
249 with SEND_FULL_GRAPH.time():
|
0
|
250 graphJson = self.masterGraph.asJsonLd()
|
|
251 log.debug("send fullGraph event: %s", graphJson)
|
|
252 self.sendEvent(message=graphJson, event=b'fullGraph')
|
|
253 self.masterGraph.addObserver(self.onPatch)
|
|
254
|
|
255 def onPatch(self, patchJson):
|
3
|
256 with SEND_PATCH.time():
|
0
|
257 # throttle and combine patches here- ideally we could see how
|
|
258 # long the latency to the client is to make a better rate choice
|
|
259 self.sendEvent(message=patchJson, event=b'patch')
|
|
260
|
|
261 def unbind(self):
|
|
262 self.masterGraph.removeObserver(self.onPatch)
|