Mercurial > code > home > repos > homeauto
annotate lib/patchablegraph/patchablegraph.py @ 1512:1e9cfec1be78
don't require first arg on PatchedGraph.serialize
Ignore-this: bc695823e03dbd012e03efa4e40b6aca
darcs-hash:1dbf9271b5ca55c78987c474ef223af9d5b811d2
author | drewp <drewp@bigasterisk.com> |
---|---|
date | Mon, 03 Feb 2020 23:46:29 -0800 |
parents | e41935c0ce96 |
children | c829df2b0dd5 |
rev | line source |
---|---|
1317 | 1 """ |
2 Design: | |
3 | |
4 1. Services each have (named) graphs, which they patch as things | |
5 change. PatchableGraph is an object for holding this graph. | |
6 2. You can http GET that graph, or ... | |
7 3. You can http GET/SSE that graph and hear about modifications to it | |
8 4. The client that got the graph holds and maintains a copy. The | |
9 client may merge together multiple graphs. | |
10 5. Client queries its graph with low-level APIs or client-side sparql. | |
11 6. When the graph changes, the client knows and can update itself at | |
12 low or high granularity. | |
13 | |
14 | |
15 See also: | |
16 * http://iswc2007.semanticweb.org/papers/533.pdf RDFSync: efficient remote synchronization of RDF | |
17 models | |
18 * https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF | |
19 * https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of | |
20 differences between RDF graphs | |
21 | |
22 """ | |
23 import json, logging, itertools | |
24 | |
25 from greplin import scales | |
26 from rdfdb.grapheditapi import GraphEditApi | |
27 from rdflib import ConjunctiveGraph | |
28 from rdflib.parser import StringInputSource | |
29 from rdflib_jsonld.serializer import from_rdf | |
30 import cyclone.sse | |
31 | |
32 from cycloneerr import PrettyErrorHandler | |
33 from rdfdb.patch import Patch | |
34 from rdfdb.rdflibpatch import patchQuads, inGraph | |
35 | |
36 log = logging.getLogger('patchablegraph') | |
37 | |
1509
e41935c0ce96
try to detect browser even better, instead of failing the whole request
drewp <drewp@bigasterisk.com>
parents:
1507
diff
changeset
|
38 def _writeGraphForBrowser(req, graph): |
e41935c0ce96
try to detect browser even better, instead of failing the whole request
drewp <drewp@bigasterisk.com>
parents:
1507
diff
changeset
|
39 # We think this is a browser, so respond with a live graph view |
e41935c0ce96
try to detect browser even better, instead of failing the whole request
drewp <drewp@bigasterisk.com>
parents:
1507
diff
changeset
|
40 # (todo) |
e41935c0ce96
try to detect browser even better, instead of failing the whole request
drewp <drewp@bigasterisk.com>
parents:
1507
diff
changeset
|
41 req.set_header('Content-type', 'text/plain') |
e41935c0ce96
try to detect browser even better, instead of failing the whole request
drewp <drewp@bigasterisk.com>
parents:
1507
diff
changeset
|
42 lines = graph.serialize(None, format='nquads').splitlines() |
e41935c0ce96
try to detect browser even better, instead of failing the whole request
drewp <drewp@bigasterisk.com>
parents:
1507
diff
changeset
|
43 lines.sort() |
e41935c0ce96
try to detect browser even better, instead of failing the whole request
drewp <drewp@bigasterisk.com>
parents:
1507
diff
changeset
|
44 req.write(b'\n'.join(lines)) |
e41935c0ce96
try to detect browser even better, instead of failing the whole request
drewp <drewp@bigasterisk.com>
parents:
1507
diff
changeset
|
45 |
e41935c0ce96
try to detect browser even better, instead of failing the whole request
drewp <drewp@bigasterisk.com>
parents:
1507
diff
changeset
|
46 |
e41935c0ce96
try to detect browser even better, instead of failing the whole request
drewp <drewp@bigasterisk.com>
parents:
1507
diff
changeset
|
47 def _writeGraphResponse(req, graph, acceptHeader: str): |
1317 | 48 if acceptHeader == 'application/nquads': |
49 req.set_header('Content-type', 'application/nquads') | |
50 graph.serialize(req, format='nquads') | |
51 elif acceptHeader == 'application/ld+json': | |
52 req.set_header('Content-type', 'application/ld+json') | |
53 graph.serialize(req, format='json-ld', indent=2) | |
54 else: | |
1504
d8eba5a51c1f
try a text/plain response if we think it's a browser asking for the graph
drewp <drewp@bigasterisk.com>
parents:
1317
diff
changeset
|
55 if acceptHeader.startswith('text/html'): |
1509
e41935c0ce96
try to detect browser even better, instead of failing the whole request
drewp <drewp@bigasterisk.com>
parents:
1507
diff
changeset
|
56 _writeGraphForBrowser(req, graph) |
1504
d8eba5a51c1f
try a text/plain response if we think it's a browser asking for the graph
drewp <drewp@bigasterisk.com>
parents:
1317
diff
changeset
|
57 return |
1317 | 58 req.set_header('Content-type', 'application/x-trig') |
59 graph.serialize(req, format='trig') | |
60 | |
61 # forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py | |
62 def _graphFromQuads2(q): | |
63 g = ConjunctiveGraph() | |
64 #g.addN(q) # no effect on nquad output | |
65 for s,p,o,c in q: | |
66 g.get_context(c).add((s,p,o)) # kind of works with broken rdflib nquad serializer code | |
67 #g.store.add((s,p,o), c) # no effect on nquad output | |
68 return g | |
69 | |
70 def jsonFromPatch(p): | |
71 return json.dumps({'patch': { | |
72 'adds': from_rdf(_graphFromQuads2(p.addQuads)), | |
73 'deletes': from_rdf(_graphFromQuads2(p.delQuads)), | |
74 }}) | |
75 patchAsJson = jsonFromPatch # deprecated name | |
76 | |
1505 | 77 |
1317 | 78 def patchFromJson(j): |
79 body = json.loads(j)['patch'] | |
80 a = ConjunctiveGraph() | |
81 a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld') | |
82 d = ConjunctiveGraph() | |
83 d.parse(StringInputSource(json.dumps(body['deletes']).encode('utf8')), format='json-ld') | |
84 return Patch(addGraph=a, delGraph=d) | |
85 | |
86 def graphAsJson(g): | |
87 # This is not the same as g.serialize(format='json-ld')! That | |
88 # version omits literal datatypes. | |
89 return json.dumps(from_rdf(g)) | |
90 | |
91 _graphsInProcess = itertools.count() | |
92 class PatchableGraph(GraphEditApi): | |
93 """ | |
94 Master graph that you modify with self.patch, and we get the | |
95 updates to all current listeners. | |
96 """ | |
97 def __init__(self): | |
98 self._graph = ConjunctiveGraph() | |
99 self._observers = [] | |
100 scales.init(self, '/patchableGraph%s' % next(_graphsInProcess)) | |
101 | |
102 _serialize = scales.PmfStat('serialize') | |
1512
1e9cfec1be78
don't require first arg on PatchedGraph.serialize
drewp <drewp@bigasterisk.com>
parents:
1509
diff
changeset
|
103 def serialize(self, *arg, **kw): |
1317 | 104 with self._serialize.time(): |
1512
1e9cfec1be78
don't require first arg on PatchedGraph.serialize
drewp <drewp@bigasterisk.com>
parents:
1509
diff
changeset
|
105 return self._graph.serialize(*arg, **kw) |
1317 | 106 |
107 _patch = scales.PmfStat('patch') | |
108 _len = scales.IntStat('statementCount') | |
109 def patch(self, p): | |
110 with self._patch.time(): | |
111 # assuming no stmt is both in p.addQuads and p.delQuads. | |
112 dels = set([q for q in p.delQuads if inGraph(q, self._graph)]) | |
113 adds = set([q for q in p.addQuads if not inGraph(q, self._graph)]) | |
114 minimizedP = Patch(addQuads=adds, delQuads=dels) | |
115 if minimizedP.isNoop(): | |
116 return | |
117 patchQuads(self._graph, | |
118 deleteQuads=dels, | |
119 addQuads=adds, | |
120 perfect=False) # true? | |
121 for ob in self._observers: | |
122 ob(patchAsJson(p)) | |
123 self._len = len(self._graph) | |
124 | |
125 def asJsonLd(self): | |
126 return graphAsJson(self._graph) | |
127 | |
128 _currentObservers = scales.IntStat('observers/current') | |
129 _observersAdded = scales.IntStat('observers/added') | |
130 def addObserver(self, onPatch): | |
131 self._observers.append(onPatch) | |
132 self._currentObservers = len(self._observers) | |
133 self._observersAdded += 1 | |
1505 | 134 |
1317 | 135 def removeObserver(self, onPatch): |
136 try: | |
137 self._observers.remove(onPatch) | |
138 except ValueError: | |
139 pass | |
140 self._currentObservers = len(self._observers) | |
141 | |
142 def setToGraph(self, newGraph): | |
143 self.patch(Patch.fromDiff(self._graph, newGraph)) | |
1505 | 144 |
1317 | 145 _sendSimpleGraph = scales.PmfStat('serve/simpleGraph') |
146 _sendFullGraph = scales.PmfStat('serve/events/sendFull') | |
147 _sendPatch = scales.PmfStat('serve/events/sendPatch') | |
148 | |
149 class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler): | |
150 def initialize(self, masterGraph): | |
151 self.masterGraph = masterGraph | |
1505 | 152 |
1317 | 153 def get(self): |
154 with self.masterGraph._sendSimpleGraph.time(): | |
1509
e41935c0ce96
try to detect browser even better, instead of failing the whole request
drewp <drewp@bigasterisk.com>
parents:
1507
diff
changeset
|
155 _writeGraphResponse(self, self.masterGraph, |
e41935c0ce96
try to detect browser even better, instead of failing the whole request
drewp <drewp@bigasterisk.com>
parents:
1507
diff
changeset
|
156 self.request.headers.get('accept', '')) |
1317 | 157 |
158 | |
159 class CycloneGraphEventsHandler(cyclone.sse.SSEHandler): | |
160 """ | |
161 One session with one client. | |
1505 | 162 |
1317 | 163 returns current graph plus future patches to keep remote version |
164 in sync with ours. | |
165 | |
166 intsead of turning off buffering all over, it may work for this | |
167 response to send 'x-accel-buffering: no', per | |
168 http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering | |
169 """ | |
170 def __init__(self, application, request, masterGraph): | |
171 cyclone.sse.SSEHandler.__init__(self, application, request) | |
172 self.masterGraph = masterGraph | |
173 | |
174 def bind(self): | |
175 with self.masterGraph._sendFullGraph.time(): | |
176 graphJson = self.masterGraph.asJsonLd() | |
177 log.debug("send fullGraph event: %s", graphJson) | |
178 self.sendEvent(message=graphJson, event=b'fullGraph') | |
179 self.masterGraph.addObserver(self.onPatch) | |
180 | |
181 def onPatch(self, patchJson): | |
182 with self.masterGraph._sendPatch.time(): | |
183 # throttle and combine patches here- ideally we could see how | |
184 # long the latency to the client is to make a better rate choice | |
185 self.sendEvent(message=patchJson, event=b'patch') | |
1505 | 186 |
1317 | 187 def unbind(self): |
188 self.masterGraph.removeObserver(self.onPatch) |