1317
|
1 """
|
|
2 Design:
|
|
3
|
|
4 1. Services each have (named) graphs, which they patch as things
|
|
5 change. PatchableGraph is an object for holding this graph.
|
|
6 2. You can http GET that graph, or ...
|
|
7 3. You can http GET/SSE that graph and hear about modifications to it
|
|
8 4. The client that got the graph holds and maintains a copy. The
|
|
9 client may merge together multiple graphs.
|
|
10 5. Client queries its graph with low-level APIs or client-side sparql.
|
|
11 6. When the graph changes, the client knows and can update itself at
|
|
12 low or high granularity.
|
|
13
|
|
14
|
|
15 See also:
|
|
16 * http://iswc2007.semanticweb.org/papers/533.pdf RDFSync: efficient remote synchronization of RDF
|
|
17 models
|
|
18 * https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF
|
|
19 * https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of
|
|
20 differences between RDF graphs
|
|
21
|
|
22 """
|
|
23 import json, logging, itertools
|
|
24
|
|
25 from greplin import scales
|
|
26 from rdfdb.grapheditapi import GraphEditApi
|
|
27 from rdflib import ConjunctiveGraph
|
|
28 from rdflib.parser import StringInputSource
|
|
29 from rdflib_jsonld.serializer import from_rdf
|
|
30 import cyclone.sse
|
|
31
|
|
32 from cycloneerr import PrettyErrorHandler
|
|
33 from rdfdb.patch import Patch
|
|
34 from rdfdb.rdflibpatch import patchQuads, inGraph
|
|
35
|
|
36 log = logging.getLogger('patchablegraph')
|
|
37
|
|
38 def writeGraphResponse(req, graph, acceptHeader):
|
|
39 if acceptHeader == 'application/nquads':
|
|
40 req.set_header('Content-type', 'application/nquads')
|
|
41 graph.serialize(req, format='nquads')
|
|
42 elif acceptHeader == 'application/ld+json':
|
|
43 req.set_header('Content-type', 'application/ld+json')
|
|
44 graph.serialize(req, format='json-ld', indent=2)
|
|
45 else:
|
|
46 req.set_header('Content-type', 'application/x-trig')
|
|
47 graph.serialize(req, format='trig')
|
|
48
|
|
49 # forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py
|
|
50 def _graphFromQuads2(q):
|
|
51 g = ConjunctiveGraph()
|
|
52 #g.addN(q) # no effect on nquad output
|
|
53 for s,p,o,c in q:
|
|
54 g.get_context(c).add((s,p,o)) # kind of works with broken rdflib nquad serializer code
|
|
55 #g.store.add((s,p,o), c) # no effect on nquad output
|
|
56 return g
|
|
57
|
|
58 def jsonFromPatch(p):
|
|
59 return json.dumps({'patch': {
|
|
60 'adds': from_rdf(_graphFromQuads2(p.addQuads)),
|
|
61 'deletes': from_rdf(_graphFromQuads2(p.delQuads)),
|
|
62 }})
|
|
63 patchAsJson = jsonFromPatch # deprecated name
|
|
64
|
|
65
|
|
66 def patchFromJson(j):
|
|
67 body = json.loads(j)['patch']
|
|
68 a = ConjunctiveGraph()
|
|
69 a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld')
|
|
70 d = ConjunctiveGraph()
|
|
71 d.parse(StringInputSource(json.dumps(body['deletes']).encode('utf8')), format='json-ld')
|
|
72 return Patch(addGraph=a, delGraph=d)
|
|
73
|
|
74 def graphAsJson(g):
|
|
75 # This is not the same as g.serialize(format='json-ld')! That
|
|
76 # version omits literal datatypes.
|
|
77 return json.dumps(from_rdf(g))
|
|
78
|
|
79 _graphsInProcess = itertools.count()
|
|
80 class PatchableGraph(GraphEditApi):
|
|
81 """
|
|
82 Master graph that you modify with self.patch, and we get the
|
|
83 updates to all current listeners.
|
|
84 """
|
|
85 def __init__(self):
|
|
86 self._graph = ConjunctiveGraph()
|
|
87 self._observers = []
|
|
88 scales.init(self, '/patchableGraph%s' % next(_graphsInProcess))
|
|
89
|
|
90 _serialize = scales.PmfStat('serialize')
|
|
91 def serialize(self, to, **kw):
|
|
92 with self._serialize.time():
|
|
93 return self._graph.serialize(to, **kw)
|
|
94
|
|
95 _patch = scales.PmfStat('patch')
|
|
96 _len = scales.IntStat('statementCount')
|
|
97 def patch(self, p):
|
|
98 with self._patch.time():
|
|
99 # assuming no stmt is both in p.addQuads and p.delQuads.
|
|
100 dels = set([q for q in p.delQuads if inGraph(q, self._graph)])
|
|
101 adds = set([q for q in p.addQuads if not inGraph(q, self._graph)])
|
|
102 minimizedP = Patch(addQuads=adds, delQuads=dels)
|
|
103 if minimizedP.isNoop():
|
|
104 return
|
|
105 patchQuads(self._graph,
|
|
106 deleteQuads=dels,
|
|
107 addQuads=adds,
|
|
108 perfect=False) # true?
|
|
109 for ob in self._observers:
|
|
110 ob(patchAsJson(p))
|
|
111 self._len = len(self._graph)
|
|
112
|
|
113 def asJsonLd(self):
|
|
114 return graphAsJson(self._graph)
|
|
115
|
|
116 _currentObservers = scales.IntStat('observers/current')
|
|
117 _observersAdded = scales.IntStat('observers/added')
|
|
118 def addObserver(self, onPatch):
|
|
119 self._observers.append(onPatch)
|
|
120 self._currentObservers = len(self._observers)
|
|
121 self._observersAdded += 1
|
|
122
|
|
123 def removeObserver(self, onPatch):
|
|
124 try:
|
|
125 self._observers.remove(onPatch)
|
|
126 except ValueError:
|
|
127 pass
|
|
128 self._currentObservers = len(self._observers)
|
|
129
|
|
130 def setToGraph(self, newGraph):
|
|
131 self.patch(Patch.fromDiff(self._graph, newGraph))
|
|
132
|
|
133 _sendSimpleGraph = scales.PmfStat('serve/simpleGraph')
|
|
134 _sendFullGraph = scales.PmfStat('serve/events/sendFull')
|
|
135 _sendPatch = scales.PmfStat('serve/events/sendPatch')
|
|
136
|
|
137 class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler):
|
|
138 def initialize(self, masterGraph):
|
|
139 self.masterGraph = masterGraph
|
|
140
|
|
141 def get(self):
|
|
142 with self.masterGraph._sendSimpleGraph.time():
|
|
143 writeGraphResponse(self, self.masterGraph,
|
|
144 self.request.headers.get('accept'))
|
|
145
|
|
146
|
|
147 class CycloneGraphEventsHandler(cyclone.sse.SSEHandler):
|
|
148 """
|
|
149 One session with one client.
|
|
150
|
|
151 returns current graph plus future patches to keep remote version
|
|
152 in sync with ours.
|
|
153
|
|
154 intsead of turning off buffering all over, it may work for this
|
|
155 response to send 'x-accel-buffering: no', per
|
|
156 http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering
|
|
157 """
|
|
158 def __init__(self, application, request, masterGraph):
|
|
159 cyclone.sse.SSEHandler.__init__(self, application, request)
|
|
160 self.masterGraph = masterGraph
|
|
161
|
|
162 def bind(self):
|
|
163 with self.masterGraph._sendFullGraph.time():
|
|
164 graphJson = self.masterGraph.asJsonLd()
|
|
165 log.debug("send fullGraph event: %s", graphJson)
|
|
166 self.sendEvent(message=graphJson, event=b'fullGraph')
|
|
167 self.masterGraph.addObserver(self.onPatch)
|
|
168
|
|
169 def onPatch(self, patchJson):
|
|
170 with self.masterGraph._sendPatch.time():
|
|
171 # throttle and combine patches here- ideally we could see how
|
|
172 # long the latency to the client is to make a better rate choice
|
|
173 self.sendEvent(message=patchJson, event=b'patch')
|
|
174
|
|
175 def unbind(self):
|
|
176 self.masterGraph.removeObserver(self.onPatch)
|
|
177
|