comparison lib/patchablegraph/patchablegraph.py @ 514:495f573af4f4

make patchablegraph release Ignore-this: f55c9a56b052797ff23a80630714b51a
author drewp@bigasterisk.com
date Mon, 22 Apr 2019 23:29:19 -0700
parents lib/patchablegraph.py@1d2817cb9a6f
children 83ccc9ba90ea
comparison
equal deleted inserted replaced
513:7a7002c95d09 514:495f573af4f4
1 """
2 Design:
3
4 1. Services each have (named) graphs, which they patch as things
5 change. PatchableGraph is an object for holding this graph.
6 2. You can http GET that graph, or ...
7 3. You can http GET/SSE that graph and hear about modifications to it
8 4. The client that got the graph holds and maintains a copy. The
9 client may merge together multiple graphs.
10 5. Client queries its graph with low-level APIs or client-side sparql.
11 6. When the graph changes, the client knows and can update itself at
12 low or high granularity.
13
14
15 See also:
16 * http://iswc2007.semanticweb.org/papers/533.pdf RDFSync: efficient remote synchronization of RDF
17 models
18 * https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF
19 * https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of
20 differences between RDF graphs
21
22 """
23 import json, logging, itertools
24
25 from greplin import scales
26 from rdfdb.grapheditapi import GraphEditApi
27 from rdflib import ConjunctiveGraph
28 from rdflib.parser import StringInputSource
29 from rdflib_jsonld.serializer import from_rdf
30 import cyclone.sse
31
32 from cycloneerr import PrettyErrorHandler
33 from rdfdb.patch import Patch
34 from rdfdb.rdflibpatch import patchQuads, inGraph
35
36 log = logging.getLogger('patchablegraph')
37
38 def writeGraphResponse(req, graph, acceptHeader):
39 if acceptHeader == 'application/nquads':
40 req.set_header('Content-type', 'application/nquads')
41 graph.serialize(req, format='nquads')
42 elif acceptHeader == 'application/ld+json':
43 req.set_header('Content-type', 'application/ld+json')
44 graph.serialize(req, format='json-ld', indent=2)
45 else:
46 req.set_header('Content-type', 'application/x-trig')
47 graph.serialize(req, format='trig')
48
49 # forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py
50 def _graphFromQuads2(q):
51 g = ConjunctiveGraph()
52 #g.addN(q) # no effect on nquad output
53 for s,p,o,c in q:
54 g.get_context(c).add((s,p,o)) # kind of works with broken rdflib nquad serializer code
55 #g.store.add((s,p,o), c) # no effect on nquad output
56 return g
57
58 def jsonFromPatch(p):
59 return json.dumps({'patch': {
60 'adds': from_rdf(_graphFromQuads2(p.addQuads)),
61 'deletes': from_rdf(_graphFromQuads2(p.delQuads)),
62 }})
63 patchAsJson = jsonFromPatch # deprecated name
64
65
66 def patchFromJson(j):
67 body = json.loads(j)['patch']
68 a = ConjunctiveGraph()
69 a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld')
70 d = ConjunctiveGraph()
71 d.parse(StringInputSource(json.dumps(body['deletes']).encode('utf8')), format='json-ld')
72 return Patch(addGraph=a, delGraph=d)
73
74 def graphAsJson(g):
75 # This is not the same as g.serialize(format='json-ld')! That
76 # version omits literal datatypes.
77 return json.dumps(from_rdf(g))
78
79 _graphsInProcess = itertools.count()
80 class PatchableGraph(GraphEditApi):
81 """
82 Master graph that you modify with self.patch, and we get the
83 updates to all current listeners.
84 """
85 def __init__(self):
86 self._graph = ConjunctiveGraph()
87 self._observers = []
88 scales.init(self, '/patchableGraph%s' % next(_graphsInProcess))
89
90 _serialize = scales.PmfStat('serialize')
91 def serialize(self, to, **kw):
92 with self._serialize.time():
93 return self._graph.serialize(to, **kw)
94
95 _patch = scales.PmfStat('patch')
96 _len = scales.IntStat('statementCount')
97 def patch(self, p):
98 with self._patch.time():
99 # assuming no stmt is both in p.addQuads and p.delQuads.
100 dels = set([q for q in p.delQuads if inGraph(q, self._graph)])
101 adds = set([q for q in p.addQuads if not inGraph(q, self._graph)])
102 minimizedP = Patch(addQuads=adds, delQuads=dels)
103 if minimizedP.isNoop():
104 return
105 patchQuads(self._graph,
106 deleteQuads=dels,
107 addQuads=adds,
108 perfect=False) # true?
109 for ob in self._observers:
110 ob(patchAsJson(p))
111 self._len = len(self._graph)
112
113 def asJsonLd(self):
114 return graphAsJson(self._graph)
115
116 _currentObservers = scales.IntStat('observers/current')
117 _observersAdded = scales.IntStat('observers/added')
118 def addObserver(self, onPatch):
119 self._observers.append(onPatch)
120 self._currentObservers = len(self._observers)
121 self._observersAdded += 1
122
123 def removeObserver(self, onPatch):
124 try:
125 self._observers.remove(onPatch)
126 except ValueError:
127 pass
128 self._currentObservers = len(self._observers)
129
130 def setToGraph(self, newGraph):
131 self.patch(Patch.fromDiff(self._graph, newGraph))
132
133 _sendSimpleGraph = scales.PmfStat('serve/simpleGraph')
134 _sendFullGraph = scales.PmfStat('serve/events/sendFull')
135 _sendPatch = scales.PmfStat('serve/events/sendPatch')
136
137 class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler):
138 def initialize(self, masterGraph):
139 self.masterGraph = masterGraph
140
141 def get(self):
142 with self.masterGraph._sendSimpleGraph.time():
143 writeGraphResponse(self, self.masterGraph,
144 self.request.headers.get('accept'))
145
146
147 class CycloneGraphEventsHandler(cyclone.sse.SSEHandler):
148 """
149 One session with one client.
150
151 returns current graph plus future patches to keep remote version
152 in sync with ours.
153
154 intsead of turning off buffering all over, it may work for this
155 response to send 'x-accel-buffering: no', per
156 http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering
157 """
158 def __init__(self, application, request, masterGraph):
159 cyclone.sse.SSEHandler.__init__(self, application, request)
160 self.masterGraph = masterGraph
161
162 def bind(self):
163 with self.masterGraph._sendFullGraph.time():
164 graphJson = self.masterGraph.asJsonLd()
165 log.debug("send fullGraph event: %s", graphJson)
166 self.sendEvent(message=graphJson, event=b'fullGraph')
167 self.masterGraph.addObserver(self.onPatch)
168
169 def onPatch(self, patchJson):
170 with self.masterGraph._sendPatch.time():
171 # throttle and combine patches here- ideally we could see how
172 # long the latency to the client is to make a better rate choice
173 self.sendEvent(message=patchJson, event=b'patch')
174
175 def unbind(self):
176 self.masterGraph.removeObserver(self.onPatch)
177