0
|
1 """
|
|
2 Design:
|
|
3
|
|
4 1. Services each have (named) graphs, which they patch as things
|
|
5 change. PatchableGraph is an object for holding this graph.
|
|
6 2. You can http GET that graph, or ...
|
|
7 3. You can http GET/SSE that graph and hear about modifications to it
|
|
8 4. The client that got the graph holds and maintains a copy. The
|
|
9 client may merge together multiple graphs.
|
|
10 5. Client queries its graph with low-level APIs or client-side sparql.
|
|
11 6. When the graph changes, the client knows and can update itself at
|
|
12 low or high granularity.
|
|
13
|
|
14
|
|
15 See also:
|
|
16 * http://iswc2007.semanticweb.org/papers/533.pdf RDFSync: efficient remote synchronization of RDF
|
|
17 models
|
|
18 * https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF
|
|
19 * https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of
|
|
20 differences between RDF graphs
|
|
21
|
|
22 """
|
25
|
23 import asyncio
|
4
|
24 import itertools
|
|
25 import json
|
|
26 import logging
|
25
|
27 import weakref
|
|
28 from typing import Callable, List, NewType, Optional, cast, Set
|
0
|
29
|
3
|
30 from prometheus_client import Counter, Gauge, Summary
|
0
|
31 from rdfdb.grapheditapi import GraphEditApi
|
4
|
32 from rdfdb.patch import Patch
|
|
33 from rdfdb.rdflibpatch import inGraph, patchQuads
|
0
|
34 from rdflib import ConjunctiveGraph
|
|
35 from rdflib.parser import StringInputSource
|
|
36 from rdflib.plugins.serializers.jsonld import from_rdf
|
17
|
37
|
25
|
38 JsonSerializedPatch = NewType('JsonSerializedPatch', str)
|
|
39 JsonLdSerializedGraph = NewType('JsonLdSerializedGraph', str)
|
0
|
40
|
|
41 log = logging.getLogger('patchablegraph')
|
|
42
|
20
|
43 SERIALIZE_CALLS = Summary('serialize_calls', 'PatchableGraph.serialize calls', labelnames=['graph'])
|
|
44 PATCH_CALLS = Summary('patch_calls', 'PatchableGraph.patch calls', labelnames=['graph'])
|
|
45 STATEMENT_COUNT = Gauge('statement_count', 'current PatchableGraph graph size', labelnames=['graph'])
|
|
46 OBSERVERS_CURRENT = Gauge('observers_current', 'current observer count', labelnames=['graph'])
|
|
47 OBSERVERS_ADDED = Counter('observers_added', 'observers added', labelnames=['graph'])
|
3
|
48
|
|
49
|
0
|
50 # forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py
|
|
51 def _graphFromQuads2(q):
|
|
52 g = ConjunctiveGraph()
|
25
|
53 # g.addN(q) # no effect on nquad output
|
4
|
54 for s, p, o, c in q:
|
|
55 g.get_context(c).add((s, p, o)) # kind of works with broken rdflib nquad serializer code
|
25
|
56 # g.store.add((s,p,o), c) # no effect on nquad output
|
0
|
57 return g
|
|
58
|
4
|
59
|
25
|
60 def jsonFromPatch(p: Patch) -> JsonSerializedPatch:
|
|
61 return cast(JsonSerializedPatch, json.dumps({'patch': {
|
|
62 'adds': from_rdf(_graphFromQuads2(p.addQuads)),
|
|
63 'deletes': from_rdf(_graphFromQuads2(p.delQuads)),
|
|
64 }}))
|
0
|
65
|
|
66
|
4
|
67 patchAsJson = jsonFromPatch # deprecated name
|
|
68
|
|
69
|
25
|
70 def patchFromJson(j: JsonSerializedPatch) -> Patch:
|
0
|
71 body = json.loads(j)['patch']
|
|
72 a = ConjunctiveGraph()
|
|
73 a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld')
|
|
74 d = ConjunctiveGraph()
|
|
75 d.parse(StringInputSource(json.dumps(body['deletes']).encode('utf8')), format='json-ld')
|
|
76 return Patch(addGraph=a, delGraph=d)
|
|
77
|
4
|
78
|
25
|
79 def graphAsJson(g: ConjunctiveGraph) -> JsonLdSerializedGraph:
|
0
|
80 # This is not the same as g.serialize(format='json-ld')! That
|
|
81 # version omits literal datatypes.
|
25
|
82 return cast(JsonLdSerializedGraph, json.dumps(from_rdf(g)))
|
0
|
83
|
4
|
84
|
0
|
85 _graphsInProcess = itertools.count()
|
4
|
86
|
|
87
|
0
|
88 class PatchableGraph(GraphEditApi):
|
|
89 """
|
|
90 Master graph that you modify with self.patch, and we get the
|
|
91 updates to all current listeners.
|
|
92 """
|
|
93
|
4
|
94 def __init__(self, label: Optional[str] = None):
|
|
95 self._graph = ConjunctiveGraph()
|
25
|
96 self._subscriptions: weakref.WeakSet[asyncio.Queue] = weakref.WeakSet()
|
|
97
|
4
|
98 if label is None:
|
|
99 label = f'patchableGraph{next(_graphsInProcess)}'
|
|
100 self.label = label
|
25
|
101 log.info('making %r', label)
|
0
|
102
|
4
|
103 def serialize(self, *arg, **kw) -> bytes:
|
|
104 with SERIALIZE_CALLS.labels(graph=self.label).time():
|
|
105 return cast(bytes, self._graph.serialize(*arg, **kw))
|
|
106
|
|
107 def patch(self, p: Patch):
|
3
|
108 with PATCH_CALLS.labels(graph=self.label).time():
|
0
|
109 # assuming no stmt is both in p.addQuads and p.delQuads.
|
|
110 dels = set([q for q in p.delQuads if inGraph(q, self._graph)])
|
|
111 adds = set([q for q in p.addQuads if not inGraph(q, self._graph)])
|
|
112 minimizedP = Patch(addQuads=adds, delQuads=dels)
|
|
113 if minimizedP.isNoop():
|
|
114 return
|
4
|
115 patchQuads(self._graph, deleteQuads=dels, addQuads=adds, perfect=False) # true?
|
25
|
116 if self._subscriptions:
|
|
117 log.info('PatchableGraph: patched; telling %s observers', len(self._subscriptions))
|
|
118 j = patchAsJson(p)
|
|
119 for q in self._subscriptions:
|
|
120 q.put_nowait(('patch', j))
|
3
|
121 STATEMENT_COUNT.labels(graph=self.label).set(len(self._graph))
|
0
|
122
|
25
|
123 def asJsonLd(self) -> JsonLdSerializedGraph:
|
0
|
124 return graphAsJson(self._graph)
|
|
125
|
25
|
126 def subscribeToPatches(self) -> asyncio.Queue:
|
|
127 q = asyncio.Queue()
|
|
128 qref = weakref.ref(q, self._onUnsubscribe)
|
|
129 self._initialSubscribeEvents(qref)
|
|
130 return q
|
0
|
131
|
25
|
132 def _initialSubscribeEvents(self, qref):
|
|
133 q = qref()
|
|
134 log.info('new sub queue %s', q)
|
|
135 self._subscriptions.add(q) # when caller forgets about queue, we will too
|
|
136 OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._subscriptions))
|
|
137 OBSERVERS_ADDED.labels(graph=self.label).inc()
|
|
138 q.put_nowait(('graph', self.asJsonLd())) # this should be chunked, or just done as reset + patches
|
|
139
|
|
140 def _onUnsubscribe(self, qref):
|
|
141 OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._subscriptions)) # minus one?
|
0
|
142
|
4
|
143 def setToGraph(self, newGraph: ConjunctiveGraph):
|
0
|
144 self.patch(Patch.fromDiff(self._graph, newGraph))
|