Mercurial > code > home > repos > patchablegraph
comparison patchablegraph.py @ 25:e11d407c46f8
rewrite for asyncio and starlette
author | drewp@bigasterisk.com |
---|---|
date | Sat, 23 Apr 2022 23:58:41 -0700 |
parents | 8d6ba6d372c8 |
children |
comparison
equal
deleted
inserted
replaced
24:1eac25669333 | 25:e11d407c46f8 |
---|---|
18 * https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF | 18 * https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF |
19 * https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of | 19 * https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of |
20 differences between RDF graphs | 20 differences between RDF graphs |
21 | 21 |
22 """ | 22 """ |
23 import html | 23 import asyncio |
24 import itertools | 24 import itertools |
25 import json | 25 import json |
26 import logging | 26 import logging |
27 from typing import Callable, List, Optional, cast | 27 import weakref |
28 from typing import Callable, List, NewType, Optional, cast, Set | |
28 | 29 |
29 import cyclone.sse | |
30 import cyclone.web | |
31 from cycloneerr import PrettyErrorHandler | |
32 from prometheus_client import Counter, Gauge, Summary | 30 from prometheus_client import Counter, Gauge, Summary |
33 from rdfdb.grapheditapi import GraphEditApi | 31 from rdfdb.grapheditapi import GraphEditApi |
34 from rdfdb.patch import Patch | 32 from rdfdb.patch import Patch |
35 from rdfdb.rdflibpatch import inGraph, patchQuads | 33 from rdfdb.rdflibpatch import inGraph, patchQuads |
36 from rdflib import ConjunctiveGraph | 34 from rdflib import ConjunctiveGraph |
37 from rdflib.namespace import NamespaceManager | |
38 from rdflib.parser import StringInputSource | 35 from rdflib.parser import StringInputSource |
39 from rdflib.plugins.serializers.jsonld import from_rdf | 36 from rdflib.plugins.serializers.jsonld import from_rdf |
40 | 37 |
41 from . import patch_cyclone | 38 JsonSerializedPatch = NewType('JsonSerializedPatch', str) |
39 JsonLdSerializedGraph = NewType('JsonLdSerializedGraph', str) | |
42 | 40 |
43 patch_cyclone.patch_sse() | |
44 log = logging.getLogger('patchablegraph') | 41 log = logging.getLogger('patchablegraph') |
45 | 42 |
46 SERIALIZE_CALLS = Summary('serialize_calls', 'PatchableGraph.serialize calls', labelnames=['graph']) | 43 SERIALIZE_CALLS = Summary('serialize_calls', 'PatchableGraph.serialize calls', labelnames=['graph']) |
47 PATCH_CALLS = Summary('patch_calls', 'PatchableGraph.patch calls', labelnames=['graph']) | 44 PATCH_CALLS = Summary('patch_calls', 'PatchableGraph.patch calls', labelnames=['graph']) |
48 STATEMENT_COUNT = Gauge('statement_count', 'current PatchableGraph graph size', labelnames=['graph']) | 45 STATEMENT_COUNT = Gauge('statement_count', 'current PatchableGraph graph size', labelnames=['graph']) |
51 | 48 |
52 | 49 |
53 # forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py | 50 # forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py |
54 def _graphFromQuads2(q): | 51 def _graphFromQuads2(q): |
55 g = ConjunctiveGraph() | 52 g = ConjunctiveGraph() |
56 #g.addN(q) # no effect on nquad output | 53 # g.addN(q) # no effect on nquad output |
57 for s, p, o, c in q: | 54 for s, p, o, c in q: |
58 g.get_context(c).add((s, p, o)) # kind of works with broken rdflib nquad serializer code | 55 g.get_context(c).add((s, p, o)) # kind of works with broken rdflib nquad serializer code |
59 #g.store.add((s,p,o), c) # no effect on nquad output | 56 # g.store.add((s,p,o), c) # no effect on nquad output |
60 return g | 57 return g |
61 | 58 |
62 | 59 |
63 def jsonFromPatch(p: Patch) -> str: | 60 def jsonFromPatch(p: Patch) -> JsonSerializedPatch: |
64 return json.dumps( | 61 return cast(JsonSerializedPatch, json.dumps({'patch': { |
65 {'patch': { | 62 'adds': from_rdf(_graphFromQuads2(p.addQuads)), |
66 'adds': from_rdf(_graphFromQuads2(p.addQuads)), | 63 'deletes': from_rdf(_graphFromQuads2(p.delQuads)), |
67 'deletes': from_rdf(_graphFromQuads2(p.delQuads)), | 64 }})) |
68 }}) | |
69 | 65 |
70 | 66 |
71 patchAsJson = jsonFromPatch # deprecated name | 67 patchAsJson = jsonFromPatch # deprecated name |
72 | 68 |
73 | 69 |
74 def patchFromJson(j: str) -> Patch: | 70 def patchFromJson(j: JsonSerializedPatch) -> Patch: |
75 body = json.loads(j)['patch'] | 71 body = json.loads(j)['patch'] |
76 a = ConjunctiveGraph() | 72 a = ConjunctiveGraph() |
77 a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld') | 73 a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld') |
78 d = ConjunctiveGraph() | 74 d = ConjunctiveGraph() |
79 d.parse(StringInputSource(json.dumps(body['deletes']).encode('utf8')), format='json-ld') | 75 d.parse(StringInputSource(json.dumps(body['deletes']).encode('utf8')), format='json-ld') |
80 return Patch(addGraph=a, delGraph=d) | 76 return Patch(addGraph=a, delGraph=d) |
81 | 77 |
82 | 78 |
83 def graphAsJson(g: ConjunctiveGraph) -> str: | 79 def graphAsJson(g: ConjunctiveGraph) -> JsonLdSerializedGraph: |
84 # This is not the same as g.serialize(format='json-ld')! That | 80 # This is not the same as g.serialize(format='json-ld')! That |
85 # version omits literal datatypes. | 81 # version omits literal datatypes. |
86 return json.dumps(from_rdf(g)) | 82 return cast(JsonLdSerializedGraph, json.dumps(from_rdf(g))) |
87 | 83 |
88 | 84 |
89 _graphsInProcess = itertools.count() | 85 _graphsInProcess = itertools.count() |
90 | 86 |
91 | 87 |
95 updates to all current listeners. | 91 updates to all current listeners. |
96 """ | 92 """ |
97 | 93 |
98 def __init__(self, label: Optional[str] = None): | 94 def __init__(self, label: Optional[str] = None): |
99 self._graph = ConjunctiveGraph() | 95 self._graph = ConjunctiveGraph() |
100 self._observers: List[Callable[[str], None]] = [] | 96 self._subscriptions: weakref.WeakSet[asyncio.Queue] = weakref.WeakSet() |
97 | |
101 if label is None: | 98 if label is None: |
102 label = f'patchableGraph{next(_graphsInProcess)}' | 99 label = f'patchableGraph{next(_graphsInProcess)}' |
103 self.label = label | 100 self.label = label |
101 log.info('making %r', label) | |
104 | 102 |
105 def serialize(self, *arg, **kw) -> bytes: | 103 def serialize(self, *arg, **kw) -> bytes: |
106 with SERIALIZE_CALLS.labels(graph=self.label).time(): | 104 with SERIALIZE_CALLS.labels(graph=self.label).time(): |
107 return cast(bytes, self._graph.serialize(*arg, **kw)) | 105 return cast(bytes, self._graph.serialize(*arg, **kw)) |
108 | 106 |
113 adds = set([q for q in p.addQuads if not inGraph(q, self._graph)]) | 111 adds = set([q for q in p.addQuads if not inGraph(q, self._graph)]) |
114 minimizedP = Patch(addQuads=adds, delQuads=dels) | 112 minimizedP = Patch(addQuads=adds, delQuads=dels) |
115 if minimizedP.isNoop(): | 113 if minimizedP.isNoop(): |
116 return | 114 return |
117 patchQuads(self._graph, deleteQuads=dels, addQuads=adds, perfect=False) # true? | 115 patchQuads(self._graph, deleteQuads=dels, addQuads=adds, perfect=False) # true? |
118 for ob in self._observers: | 116 if self._subscriptions: |
119 ob(patchAsJson(p)) | 117 log.info('PatchableGraph: patched; telling %s observers', len(self._subscriptions)) |
118 j = patchAsJson(p) | |
119 for q in self._subscriptions: | |
120 q.put_nowait(('patch', j)) | |
120 STATEMENT_COUNT.labels(graph=self.label).set(len(self._graph)) | 121 STATEMENT_COUNT.labels(graph=self.label).set(len(self._graph)) |
121 | 122 |
122 def asJsonLd(self) -> str: | 123 def asJsonLd(self) -> JsonLdSerializedGraph: |
123 return graphAsJson(self._graph) | 124 return graphAsJson(self._graph) |
124 | 125 |
125 def addObserver(self, onPatch: Callable[[str], None]): | 126 def subscribeToPatches(self) -> asyncio.Queue: |
126 self._observers.append(onPatch) | 127 q = asyncio.Queue() |
127 OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._observers)) | 128 qref = weakref.ref(q, self._onUnsubscribe) |
129 self._initialSubscribeEvents(qref) | |
130 return q | |
131 | |
132 def _initialSubscribeEvents(self, qref): | |
133 q = qref() | |
134 log.info('new sub queue %s', q) | |
135 self._subscriptions.add(q) # when caller forgets about queue, we will too | |
136 OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._subscriptions)) | |
128 OBSERVERS_ADDED.labels(graph=self.label).inc() | 137 OBSERVERS_ADDED.labels(graph=self.label).inc() |
138 q.put_nowait(('graph', self.asJsonLd())) # this should be chunked, or just done as reset + patches | |
129 | 139 |
130 def removeObserver(self, onPatch: Callable[[str], None]): | 140 def _onUnsubscribe(self, qref): |
131 try: | 141 OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._subscriptions)) # minus one? |
132 self._observers.remove(onPatch) | |
133 except ValueError: | |
134 pass | |
135 self._currentObservers = len(self._observers) | |
136 | 142 |
137 def setToGraph(self, newGraph: ConjunctiveGraph): | 143 def setToGraph(self, newGraph: ConjunctiveGraph): |
138 self.patch(Patch.fromDiff(self._graph, newGraph)) | 144 self.patch(Patch.fromDiff(self._graph, newGraph)) |
139 | |
140 | |
141 SEND_SIMPLE_GRAPH = Summary('send_simple_graph', 'calls to _writeGraphResponse') | |
142 | |
143 | |
144 class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler): | |
145 | |
146 def initialize(self, masterGraph: PatchableGraph): | |
147 self.masterGraph = masterGraph | |
148 | |
149 def get(self): | |
150 with SEND_SIMPLE_GRAPH.time(): | |
151 self._writeGraphResponse() | |
152 | |
153 def _writeGraphResponse(self): | |
154 acceptHeader = self.request.headers.get( | |
155 'Accept', | |
156 # see https://github.com/fiorix/cyclone/issues/20 | |
157 self.request.headers.get('accept', '')) | |
158 | |
159 if acceptHeader == 'application/nquads': | |
160 self.set_header('Content-type', 'application/nquads') | |
161 self.masterGraph.serialize(self, format='nquads') | |
162 elif acceptHeader == 'application/ld+json': | |
163 self.set_header('Content-type', 'application/ld+json') | |
164 self.masterGraph.serialize(self, format='json-ld', indent=2) | |
165 else: | |
166 if acceptHeader.startswith('text/html'): | |
167 self._writeGraphForBrowser() | |
168 return | |
169 self.set_header('Content-type', 'application/x-trig') | |
170 self.masterGraph.serialize(self, format='trig') | |
171 | |
172 def _writeGraphForBrowser(self): | |
173 # We think this is a browser, so respond with a live graph view | |
174 # (todo) | |
175 self.set_header('Content-type', 'text/html') | |
176 | |
177 self.write(b''' | |
178 <html><body><pre>''') | |
179 | |
180 ns = NamespaceManager(self.masterGraph._graph) | |
181 # maybe these could be on the PatchableGraph instance | |
182 ns.bind('ex', 'http://example.com/') | |
183 ns.bind('', 'http://projects.bigasterisk.com/room/') | |
184 ns.bind("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#") | |
185 ns.bind("xsd", "http://www.w3.org/2001/XMLSchema#") | |
186 | |
187 for s, p, o, g in sorted(self.masterGraph._graph.quads()): | |
188 g = g.identifier | |
189 nquadLine = f'{s.n3(ns)} {p.n3(ns)} {o.n3(ns)} {g.n3(ns)} .\n' | |
190 self.write(html.escape(nquadLine).encode('utf8')) | |
191 | |
192 self.write(b''' | |
193 </pre> | |
194 <p> | |
195 <a href="#">[refresh]</a> | |
196 <label><input type="checkbox"> Auto-refresh</label> | |
197 </p> | |
198 <script> | |
199 | |
200 if (new URL(window.location).searchParams.get('autorefresh') == 'on') { | |
201 document.querySelector("input").checked = true; | |
202 setTimeout(() => { | |
203 requestAnimationFrame(() => { | |
204 window.location.replace(window.location.href); | |
205 }); | |
206 }, 2000); | |
207 } | |
208 | |
209 document.querySelector("a").addEventListener("click", (ev) => { | |
210 ev.preventDefault(); | |
211 window.location.replace(window.location.href); | |
212 | |
213 }); | |
214 document.querySelector("input").addEventListener("change", (ev) => { | |
215 if (document.querySelector("input").checked) { | |
216 const u = new URL(window.location); | |
217 u.searchParams.set('autorefresh', 'on'); | |
218 window.location.replace(u.href); | |
219 } else { | |
220 const u = new URL(window.location); | |
221 u.searchParams.delete('autorefresh'); | |
222 window.location.replace(u.href); | |
223 } | |
224 }); | |
225 | |
226 </script> | |
227 </body></html> | |
228 ''') | |
229 | |
230 | |
231 SEND_FULL_GRAPH = Summary('send_full_graph', 'fullGraph SSE events') | |
232 SEND_PATCH = Summary('send_patch', 'patch SSE events') | |
233 | |
234 | |
235 class CycloneGraphEventsHandler(cyclone.sse.SSEHandler): | |
236 """ | |
237 One session with one client. | |
238 | |
239 returns current graph plus future patches to keep remote version | |
240 in sync with ours. | |
241 | |
242 intsead of turning off buffering all over, it may work for this | |
243 response to send 'x-accel-buffering: no', per | |
244 http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering | |
245 """ | |
246 | |
247 def __init__(self, application, request, masterGraph): | |
248 cyclone.sse.SSEHandler.__init__(self, application, request) | |
249 self.masterGraph = masterGraph | |
250 | |
251 def bind(self): | |
252 with SEND_FULL_GRAPH.time(): | |
253 graphJson = self.masterGraph.asJsonLd() | |
254 log.debug("send fullGraph event: %s", graphJson) | |
255 self.sendEvent(message=graphJson, event=b'fullGraph') | |
256 self.masterGraph.addObserver(self.onPatch) | |
257 | |
258 def onPatch(self, patchJson): | |
259 with SEND_PATCH.time(): | |
260 # throttle and combine patches here- ideally we could see how | |
261 # long the latency to the client is to make a better rate choice | |
262 self.sendEvent(message=patchJson, event=b'patch') | |
263 | |
264 def unbind(self): | |
265 self.masterGraph.removeObserver(self.onPatch) | |
266 | |
267 def flush(self): | |
268 if getattr(self.settings, 'allowOrigin'): | |
269 allow = self.settings.allowOrigin | |
270 if allow != b'*': | |
271 raise NotImplementedError() | |
272 self.set_header(b"Access-Control-Allow-Origin", allow) | |
273 return CycloneGraphEventsHandler.flush(self) |