comparison patchablegraph.py @ 0:c3f0a692c4cb

move repo from homeauto/lib/
author drewp@bigasterisk.com
date Wed, 24 Nov 2021 10:20:55 -0800
parents
children 703adc4f78b1
comparison
equal deleted inserted replaced
-1:000000000000 0:c3f0a692c4cb
1 """
2 Design:
3
4 1. Services each have (named) graphs, which they patch as things
5 change. PatchableGraph is an object for holding this graph.
6 2. You can http GET that graph, or ...
7 3. You can http GET/SSE that graph and hear about modifications to it
8 4. The client that got the graph holds and maintains a copy. The
9 client may merge together multiple graphs.
10 5. Client queries its graph with low-level APIs or client-side sparql.
11 6. When the graph changes, the client knows and can update itself at
12 low or high granularity.
13
14
15 See also:
16 * http://iswc2007.semanticweb.org/papers/533.pdf RDFSync: efficient remote synchronization of RDF
17 models
18 * https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF
19 * https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of
20 differences between RDF graphs
21
22 """
23 import json, logging, itertools, html
24
25 from greplin import scales
26 from rdfdb.grapheditapi import GraphEditApi
27 from rdflib import ConjunctiveGraph
28 from rdflib.namespace import NamespaceManager
29 from rdflib.parser import StringInputSource
30 from rdflib.plugins.serializers.jsonld import from_rdf
31 import cyclone.sse
32 from cycloneerr import PrettyErrorHandler
33 from rdfdb.patch import Patch
34 from rdfdb.rdflibpatch import patchQuads, inGraph
35
36 log = logging.getLogger('patchablegraph')
37
38 # forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py
39 def _graphFromQuads2(q):
40 g = ConjunctiveGraph()
41 #g.addN(q) # no effect on nquad output
42 for s,p,o,c in q:
43 g.get_context(c).add((s,p,o)) # kind of works with broken rdflib nquad serializer code
44 #g.store.add((s,p,o), c) # no effect on nquad output
45 return g
46
47 def jsonFromPatch(p):
48 return json.dumps({'patch': {
49 'adds': from_rdf(_graphFromQuads2(p.addQuads)),
50 'deletes': from_rdf(_graphFromQuads2(p.delQuads)),
51 }})
52 patchAsJson = jsonFromPatch # deprecated name
53
54
55 def patchFromJson(j):
56 body = json.loads(j)['patch']
57 a = ConjunctiveGraph()
58 a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld')
59 d = ConjunctiveGraph()
60 d.parse(StringInputSource(json.dumps(body['deletes']).encode('utf8')), format='json-ld')
61 return Patch(addGraph=a, delGraph=d)
62
63 def graphAsJson(g):
64 # This is not the same as g.serialize(format='json-ld')! That
65 # version omits literal datatypes.
66 return json.dumps(from_rdf(g))
67
68 _graphsInProcess = itertools.count()
69 class PatchableGraph(GraphEditApi):
70 """
71 Master graph that you modify with self.patch, and we get the
72 updates to all current listeners.
73 """
74 def __init__(self):
75 self._graph = ConjunctiveGraph()
76 self._observers = []
77 scales.init(self, '/patchableGraph%s' % next(_graphsInProcess))
78
79 _serialize = scales.PmfStat('serialize')
80 def serialize(self, *arg, **kw):
81 with self._serialize.time():
82 return self._graph.serialize(*arg, **kw)
83
84 _patch = scales.PmfStat('patch')
85 _len = scales.IntStat('statementCount')
86 def patch(self, p):
87 with self._patch.time():
88 # assuming no stmt is both in p.addQuads and p.delQuads.
89 dels = set([q for q in p.delQuads if inGraph(q, self._graph)])
90 adds = set([q for q in p.addQuads if not inGraph(q, self._graph)])
91 minimizedP = Patch(addQuads=adds, delQuads=dels)
92 if minimizedP.isNoop():
93 return
94 patchQuads(self._graph,
95 deleteQuads=dels,
96 addQuads=adds,
97 perfect=False) # true?
98 for ob in self._observers:
99 ob(patchAsJson(p))
100 self._len = len(self._graph)
101
102 def asJsonLd(self):
103 return graphAsJson(self._graph)
104
105 _currentObservers = scales.IntStat('observers/current')
106 _observersAdded = scales.IntStat('observers/added')
107 def addObserver(self, onPatch):
108 self._observers.append(onPatch)
109 self._currentObservers = len(self._observers)
110 self._observersAdded += 1
111
112 def removeObserver(self, onPatch):
113 try:
114 self._observers.remove(onPatch)
115 except ValueError:
116 pass
117 self._currentObservers = len(self._observers)
118
119 def setToGraph(self, newGraph):
120 self.patch(Patch.fromDiff(self._graph, newGraph))
121
122 _sendSimpleGraph = scales.PmfStat('serve/simpleGraph')
123 _sendFullGraph = scales.PmfStat('serve/events/sendFull')
124 _sendPatch = scales.PmfStat('serve/events/sendPatch')
125
126
127 class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler):
128 def initialize(self, masterGraph: PatchableGraph):
129 self.masterGraph = masterGraph
130
131 def get(self):
132 with self.masterGraph._sendSimpleGraph.time():
133 self._writeGraphResponse()
134
135 def _writeGraphResponse(self):
136 acceptHeader = self.request.headers.get(
137 'Accept',
138 # see https://github.com/fiorix/cyclone/issues/20
139 self.request.headers.get('accept', ''))
140
141 if acceptHeader == 'application/nquads':
142 self.set_header('Content-type', 'application/nquads')
143 self.masterGraph.serialize(self, format='nquads')
144 elif acceptHeader == 'application/ld+json':
145 self.set_header('Content-type', 'application/ld+json')
146 self.masterGraph.serialize(self, format='json-ld', indent=2)
147 else:
148 if acceptHeader.startswith('text/html'):
149 self._writeGraphForBrowser()
150 return
151 self.set_header('Content-type', 'application/x-trig')
152 self.masterGraph.serialize(self, format='trig')
153
154 def _writeGraphForBrowser(self):
155 # We think this is a browser, so respond with a live graph view
156 # (todo)
157 self.set_header('Content-type', 'text/html')
158
159 self.write(b'''
160 <html><body><pre>''')
161
162 ns = NamespaceManager(self.masterGraph._graph)
163 # maybe these could be on the PatchableGraph instance
164 ns.bind('ex', 'http://example.com/')
165 ns.bind('', 'http://projects.bigasterisk.com/room/')
166 ns.bind("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#")
167 ns.bind("xsd", "http://www.w3.org/2001/XMLSchema#")
168
169 for s, p, o, g in sorted(self.masterGraph._graph.quads()):
170 g = g.identifier
171 nquadLine = f'{s.n3(ns)} {p.n3(ns)} {o.n3(ns)} {g.n3(ns)} .\n'
172 self.write(html.escape(nquadLine).encode('utf8'))
173
174 self.write(b'''
175 </pre>
176 <p>
177 <a href="#">[refresh]</a>
178 <label><input type="checkbox"> Auto-refresh</label>
179 </p>
180 <script>
181
182 if (new URL(window.location).searchParams.get('autorefresh') == 'on') {
183 document.querySelector("input").checked = true;
184 setTimeout(() => {
185 requestAnimationFrame(() => {
186 window.location.replace(window.location.href);
187 });
188 }, 2000);
189 }
190
191 document.querySelector("a").addEventListener("click", (ev) => {
192 ev.preventDefault();
193 window.location.replace(window.location.href);
194
195 });
196 document.querySelector("input").addEventListener("change", (ev) => {
197 if (document.querySelector("input").checked) {
198 const u = new URL(window.location);
199 u.searchParams.set('autorefresh', 'on');
200 window.location.replace(u.href);
201 } else {
202 const u = new URL(window.location);
203 u.searchParams.delete('autorefresh');
204 window.location.replace(u.href);
205 }
206 });
207
208 </script>
209 </body></html>
210 ''')
211
212
213 class CycloneGraphEventsHandler(cyclone.sse.SSEHandler):
214 """
215 One session with one client.
216
217 returns current graph plus future patches to keep remote version
218 in sync with ours.
219
220 intsead of turning off buffering all over, it may work for this
221 response to send 'x-accel-buffering: no', per
222 http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering
223 """
224 def __init__(self, application, request, masterGraph):
225 cyclone.sse.SSEHandler.__init__(self, application, request)
226 self.masterGraph = masterGraph
227
228 def bind(self):
229 with self.masterGraph._sendFullGraph.time():
230 graphJson = self.masterGraph.asJsonLd()
231 log.debug("send fullGraph event: %s", graphJson)
232 self.sendEvent(message=graphJson, event=b'fullGraph')
233 self.masterGraph.addObserver(self.onPatch)
234
235 def onPatch(self, patchJson):
236 with self.masterGraph._sendPatch.time():
237 # throttle and combine patches here- ideally we could see how
238 # long the latency to the client is to make a better rate choice
239 self.sendEvent(message=patchJson, event=b'patch')
240
241 def unbind(self):
242 self.masterGraph.removeObserver(self.onPatch)