Mercurial > code > home > repos > patchablegraph
annotate patchablegraph.py @ 15:d453aa85416d 0.17.0
release 0.17.0
author | drewp@bigasterisk.com |
---|---|
date | Wed, 24 Nov 2021 20:00:59 -0800 |
parents | 1b6718a54c00 |
children | 388a5e15d249 |
rev | line source |
---|---|
0 | 1 """ |
2 Design: | |
3 | |
4 1. Services each have (named) graphs, which they patch as things | |
5 change. PatchableGraph is an object for holding this graph. | |
6 2. You can http GET that graph, or ... | |
7 3. You can http GET/SSE that graph and hear about modifications to it | |
8 4. The client that got the graph holds and maintains a copy. The | |
9 client may merge together multiple graphs. | |
10 5. Client queries its graph with low-level APIs or client-side sparql. | |
11 6. When the graph changes, the client knows and can update itself at | |
12 low or high granularity. | |
13 | |
14 | |
15 See also: | |
16 * http://iswc2007.semanticweb.org/papers/533.pdf RDFSync: efficient remote synchronization of RDF | |
17 models | |
18 * https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF | |
19 * https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of | |
20 differences between RDF graphs | |
21 | |
22 """ | |
4 | 23 import html |
24 import itertools | |
25 import json | |
26 import logging | |
27 from typing import Callable, List, Optional, cast | |
0 | 28 |
4 | 29 import cyclone.sse |
30 import cyclone.web | |
31 from cycloneerr import PrettyErrorHandler | |
3 | 32 from prometheus_client import Counter, Gauge, Summary |
0 | 33 from rdfdb.grapheditapi import GraphEditApi |
4 | 34 from rdfdb.patch import Patch |
35 from rdfdb.rdflibpatch import inGraph, patchQuads | |
0 | 36 from rdflib import ConjunctiveGraph |
37 from rdflib.namespace import NamespaceManager | |
38 from rdflib.parser import StringInputSource | |
39 from rdflib.plugins.serializers.jsonld import from_rdf | |
9
1b6718a54c00
add a workaround for a str/bytes bug in cyclone SSE
drewp@bigasterisk.com
parents:
4
diff
changeset
|
40 import patch_cyclone |
0 | 41 |
9
1b6718a54c00
add a workaround for a str/bytes bug in cyclone SSE
drewp@bigasterisk.com
parents:
4
diff
changeset
|
42 patch_cyclone.patch_sse() |
0 | 43 log = logging.getLogger('patchablegraph') |
44 | |
3 | 45 SERIALIZE_CALLS = Summary('serialize_calls', 'PatchableGraph.serialize calls') |
46 PATCH_CALLS = Summary('patch_calls', 'PatchableGraph.patch calls') | |
47 STATEMENT_COUNT = Gauge('statement_count', 'current PatchableGraph graph size') | |
48 OBSERVERS_CURRENT = Gauge('observers_current', 'current observer count') | |
49 OBSERVERS_ADDED = Counter('observers_added', 'observers added') | |
50 | |
51 | |
0 | 52 # forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py |
53 def _graphFromQuads2(q): | |
54 g = ConjunctiveGraph() | |
55 #g.addN(q) # no effect on nquad output | |
4 | 56 for s, p, o, c in q: |
57 g.get_context(c).add((s, p, o)) # kind of works with broken rdflib nquad serializer code | |
0 | 58 #g.store.add((s,p,o), c) # no effect on nquad output |
59 return g | |
60 | |
4 | 61 |
62 def jsonFromPatch(p: Patch) -> str: | |
63 return json.dumps( | |
64 {'patch': { | |
65 'adds': from_rdf(_graphFromQuads2(p.addQuads)), | |
66 'deletes': from_rdf(_graphFromQuads2(p.delQuads)), | |
67 }}) | |
0 | 68 |
69 | |
4 | 70 patchAsJson = jsonFromPatch # deprecated name |
71 | |
72 | |
73 def patchFromJson(j: str) -> Patch: | |
0 | 74 body = json.loads(j)['patch'] |
75 a = ConjunctiveGraph() | |
76 a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld') | |
77 d = ConjunctiveGraph() | |
78 d.parse(StringInputSource(json.dumps(body['deletes']).encode('utf8')), format='json-ld') | |
79 return Patch(addGraph=a, delGraph=d) | |
80 | |
4 | 81 |
82 def graphAsJson(g: ConjunctiveGraph) -> str: | |
0 | 83 # This is not the same as g.serialize(format='json-ld')! That |
84 # version omits literal datatypes. | |
85 return json.dumps(from_rdf(g)) | |
86 | |
4 | 87 |
0 | 88 _graphsInProcess = itertools.count() |
4 | 89 |
90 | |
0 | 91 class PatchableGraph(GraphEditApi): |
92 """ | |
93 Master graph that you modify with self.patch, and we get the | |
94 updates to all current listeners. | |
95 """ | |
96 | |
4 | 97 def __init__(self, label: Optional[str] = None): |
98 self._graph = ConjunctiveGraph() | |
99 self._observers: List[Callable[[str], None]] = [] | |
100 if label is None: | |
101 label = f'patchableGraph{next(_graphsInProcess)}' | |
102 self.label = label | |
0 | 103 |
4 | 104 def serialize(self, *arg, **kw) -> bytes: |
105 with SERIALIZE_CALLS.labels(graph=self.label).time(): | |
106 return cast(bytes, self._graph.serialize(*arg, **kw)) | |
107 | |
108 def patch(self, p: Patch): | |
3 | 109 with PATCH_CALLS.labels(graph=self.label).time(): |
0 | 110 # assuming no stmt is both in p.addQuads and p.delQuads. |
111 dels = set([q for q in p.delQuads if inGraph(q, self._graph)]) | |
112 adds = set([q for q in p.addQuads if not inGraph(q, self._graph)]) | |
113 minimizedP = Patch(addQuads=adds, delQuads=dels) | |
114 if minimizedP.isNoop(): | |
115 return | |
4 | 116 patchQuads(self._graph, deleteQuads=dels, addQuads=adds, perfect=False) # true? |
0 | 117 for ob in self._observers: |
118 ob(patchAsJson(p)) | |
3 | 119 STATEMENT_COUNT.labels(graph=self.label).set(len(self._graph)) |
0 | 120 |
4 | 121 def asJsonLd(self) -> str: |
0 | 122 return graphAsJson(self._graph) |
123 | |
4 | 124 def addObserver(self, onPatch: Callable[[str], None]): |
0 | 125 self._observers.append(onPatch) |
3 | 126 OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._observers)) |
127 OBSERVERS_ADDED.labels(graph=self.label).inc() | |
0 | 128 |
4 | 129 def removeObserver(self, onPatch: Callable[[str], None]): |
0 | 130 try: |
131 self._observers.remove(onPatch) | |
132 except ValueError: | |
133 pass | |
134 self._currentObservers = len(self._observers) | |
135 | |
4 | 136 def setToGraph(self, newGraph: ConjunctiveGraph): |
0 | 137 self.patch(Patch.fromDiff(self._graph, newGraph)) |
138 | |
3 | 139 |
140 SEND_SIMPLE_GRAPH = Summary('send_simple_graph', 'calls to _writeGraphResponse') | |
0 | 141 |
142 | |
143 class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler): | |
4 | 144 |
0 | 145 def initialize(self, masterGraph: PatchableGraph): |
146 self.masterGraph = masterGraph | |
147 | |
148 def get(self): | |
3 | 149 with SEND_SIMPLE_GRAPH.time(): |
0 | 150 self._writeGraphResponse() |
151 | |
152 def _writeGraphResponse(self): | |
153 acceptHeader = self.request.headers.get( | |
154 'Accept', | |
155 # see https://github.com/fiorix/cyclone/issues/20 | |
156 self.request.headers.get('accept', '')) | |
157 | |
158 if acceptHeader == 'application/nquads': | |
159 self.set_header('Content-type', 'application/nquads') | |
160 self.masterGraph.serialize(self, format='nquads') | |
161 elif acceptHeader == 'application/ld+json': | |
162 self.set_header('Content-type', 'application/ld+json') | |
163 self.masterGraph.serialize(self, format='json-ld', indent=2) | |
164 else: | |
165 if acceptHeader.startswith('text/html'): | |
166 self._writeGraphForBrowser() | |
167 return | |
168 self.set_header('Content-type', 'application/x-trig') | |
169 self.masterGraph.serialize(self, format='trig') | |
170 | |
171 def _writeGraphForBrowser(self): | |
172 # We think this is a browser, so respond with a live graph view | |
173 # (todo) | |
174 self.set_header('Content-type', 'text/html') | |
175 | |
176 self.write(b''' | |
177 <html><body><pre>''') | |
178 | |
179 ns = NamespaceManager(self.masterGraph._graph) | |
180 # maybe these could be on the PatchableGraph instance | |
181 ns.bind('ex', 'http://example.com/') | |
182 ns.bind('', 'http://projects.bigasterisk.com/room/') | |
183 ns.bind("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#") | |
184 ns.bind("xsd", "http://www.w3.org/2001/XMLSchema#") | |
185 | |
186 for s, p, o, g in sorted(self.masterGraph._graph.quads()): | |
187 g = g.identifier | |
188 nquadLine = f'{s.n3(ns)} {p.n3(ns)} {o.n3(ns)} {g.n3(ns)} .\n' | |
189 self.write(html.escape(nquadLine).encode('utf8')) | |
190 | |
191 self.write(b''' | |
192 </pre> | |
193 <p> | |
194 <a href="#">[refresh]</a> | |
195 <label><input type="checkbox"> Auto-refresh</label> | |
196 </p> | |
197 <script> | |
198 | |
199 if (new URL(window.location).searchParams.get('autorefresh') == 'on') { | |
200 document.querySelector("input").checked = true; | |
201 setTimeout(() => { | |
202 requestAnimationFrame(() => { | |
203 window.location.replace(window.location.href); | |
204 }); | |
205 }, 2000); | |
206 } | |
207 | |
208 document.querySelector("a").addEventListener("click", (ev) => { | |
209 ev.preventDefault(); | |
210 window.location.replace(window.location.href); | |
211 | |
212 }); | |
213 document.querySelector("input").addEventListener("change", (ev) => { | |
214 if (document.querySelector("input").checked) { | |
215 const u = new URL(window.location); | |
216 u.searchParams.set('autorefresh', 'on'); | |
217 window.location.replace(u.href); | |
218 } else { | |
219 const u = new URL(window.location); | |
220 u.searchParams.delete('autorefresh'); | |
221 window.location.replace(u.href); | |
222 } | |
223 }); | |
224 | |
225 </script> | |
226 </body></html> | |
227 ''') | |
228 | |
229 | |
3 | 230 SEND_FULL_GRAPH = Summary('send_full_graph', 'fullGraph SSE events') |
231 SEND_PATCH = Summary('send_patch', 'patch SSE events') | |
232 | |
233 | |
0 | 234 class CycloneGraphEventsHandler(cyclone.sse.SSEHandler): |
235 """ | |
236 One session with one client. | |
237 | |
238 returns current graph plus future patches to keep remote version | |
239 in sync with ours. | |
240 | |
241 intsead of turning off buffering all over, it may work for this | |
242 response to send 'x-accel-buffering: no', per | |
243 http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering | |
244 """ | |
4 | 245 |
0 | 246 def __init__(self, application, request, masterGraph): |
247 cyclone.sse.SSEHandler.__init__(self, application, request) | |
248 self.masterGraph = masterGraph | |
249 | |
250 def bind(self): | |
3 | 251 with SEND_FULL_GRAPH.time(): |
0 | 252 graphJson = self.masterGraph.asJsonLd() |
253 log.debug("send fullGraph event: %s", graphJson) | |
254 self.sendEvent(message=graphJson, event=b'fullGraph') | |
255 self.masterGraph.addObserver(self.onPatch) | |
256 | |
257 def onPatch(self, patchJson): | |
3 | 258 with SEND_PATCH.time(): |
0 | 259 # throttle and combine patches here- ideally we could see how |
260 # long the latency to the client is to make a better rate choice | |
261 self.sendEvent(message=patchJson, event=b'patch') | |
262 | |
263 def unbind(self): | |
264 self.masterGraph.removeObserver(self.onPatch) |