comparison patchablegraph.py @ 4:dc4f852d0d70

reformat and add some types
author drewp@bigasterisk.com
date Wed, 24 Nov 2021 19:47:35 -0800
parents 703adc4f78b1
children 1b6718a54c00
comparison
equal deleted inserted replaced
3:703adc4f78b1 4:dc4f852d0d70
18 * https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF 18 * https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF
19 * https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of 19 * https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of
20 differences between RDF graphs 20 differences between RDF graphs
21 21
22 """ 22 """
23 import json, logging, itertools, html 23 import html
24 24 import itertools
25 import json
26 import logging
27 from typing import Callable, List, Optional, cast
28
29 import cyclone.sse
30 import cyclone.web
31 from cycloneerr import PrettyErrorHandler
25 from prometheus_client import Counter, Gauge, Summary 32 from prometheus_client import Counter, Gauge, Summary
26 from rdfdb.grapheditapi import GraphEditApi 33 from rdfdb.grapheditapi import GraphEditApi
34 from rdfdb.patch import Patch
35 from rdfdb.rdflibpatch import inGraph, patchQuads
27 from rdflib import ConjunctiveGraph 36 from rdflib import ConjunctiveGraph
28 from rdflib.namespace import NamespaceManager 37 from rdflib.namespace import NamespaceManager
29 from rdflib.parser import StringInputSource 38 from rdflib.parser import StringInputSource
30 from rdflib.plugins.serializers.jsonld import from_rdf 39 from rdflib.plugins.serializers.jsonld import from_rdf
31 import cyclone.sse
32 from cycloneerr import PrettyErrorHandler
33 from rdfdb.patch import Patch
34 from rdfdb.rdflibpatch import patchQuads, inGraph
35 40
36 log = logging.getLogger('patchablegraph') 41 log = logging.getLogger('patchablegraph')
37 42
38 SERIALIZE_CALLS = Summary('serialize_calls', 'PatchableGraph.serialize calls') 43 SERIALIZE_CALLS = Summary('serialize_calls', 'PatchableGraph.serialize calls')
39 PATCH_CALLS = Summary('patch_calls', 'PatchableGraph.patch calls') 44 PATCH_CALLS = Summary('patch_calls', 'PatchableGraph.patch calls')
44 49
45 # forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py 50 # forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py
46 def _graphFromQuads2(q): 51 def _graphFromQuads2(q):
47 g = ConjunctiveGraph() 52 g = ConjunctiveGraph()
48 #g.addN(q) # no effect on nquad output 53 #g.addN(q) # no effect on nquad output
49 for s,p,o,c in q: 54 for s, p, o, c in q:
50 g.get_context(c).add((s,p,o)) # kind of works with broken rdflib nquad serializer code 55 g.get_context(c).add((s, p, o)) # kind of works with broken rdflib nquad serializer code
51 #g.store.add((s,p,o), c) # no effect on nquad output 56 #g.store.add((s,p,o), c) # no effect on nquad output
52 return g 57 return g
53 58
54 def jsonFromPatch(p): 59
55 return json.dumps({'patch': { 60 def jsonFromPatch(p: Patch) -> str:
56 'adds': from_rdf(_graphFromQuads2(p.addQuads)), 61 return json.dumps(
57 'deletes': from_rdf(_graphFromQuads2(p.delQuads)), 62 {'patch': {
58 }}) 63 'adds': from_rdf(_graphFromQuads2(p.addQuads)),
59 patchAsJson = jsonFromPatch # deprecated name 64 'deletes': from_rdf(_graphFromQuads2(p.delQuads)),
60 65 }})
61 66
62 def patchFromJson(j): 67
68 patchAsJson = jsonFromPatch # deprecated name
69
70
71 def patchFromJson(j: str) -> Patch:
63 body = json.loads(j)['patch'] 72 body = json.loads(j)['patch']
64 a = ConjunctiveGraph() 73 a = ConjunctiveGraph()
65 a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld') 74 a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld')
66 d = ConjunctiveGraph() 75 d = ConjunctiveGraph()
67 d.parse(StringInputSource(json.dumps(body['deletes']).encode('utf8')), format='json-ld') 76 d.parse(StringInputSource(json.dumps(body['deletes']).encode('utf8')), format='json-ld')
68 return Patch(addGraph=a, delGraph=d) 77 return Patch(addGraph=a, delGraph=d)
69 78
70 def graphAsJson(g): 79
80 def graphAsJson(g: ConjunctiveGraph) -> str:
71 # This is not the same as g.serialize(format='json-ld')! That 81 # This is not the same as g.serialize(format='json-ld')! That
72 # version omits literal datatypes. 82 # version omits literal datatypes.
73 return json.dumps(from_rdf(g)) 83 return json.dumps(from_rdf(g))
74 84
85
75 _graphsInProcess = itertools.count() 86 _graphsInProcess = itertools.count()
87
88
76 class PatchableGraph(GraphEditApi): 89 class PatchableGraph(GraphEditApi):
77 """ 90 """
78 Master graph that you modify with self.patch, and we get the 91 Master graph that you modify with self.patch, and we get the
79 updates to all current listeners. 92 updates to all current listeners.
80 """ 93 """
81 def __init__(self): 94
95 def __init__(self, label: Optional[str] = None):
82 self._graph = ConjunctiveGraph() 96 self._graph = ConjunctiveGraph()
83 self._observers = [] 97 self._observers: List[Callable[[str], None]] = []
84 scales.init(self, '/patchableGraph%s' % next(_graphsInProcess)) 98 if label is None:
85 99 label = f'patchableGraph{next(_graphsInProcess)}'
86 _serialize = scales.PmfStat('serialize') 100 self.label = label
87 def serialize(self, *arg, **kw): 101
88 with self._serialize.time(): 102 def serialize(self, *arg, **kw) -> bytes:
89 return self._graph.serialize(*arg, **kw) 103 with SERIALIZE_CALLS.labels(graph=self.label).time():
90 104 return cast(bytes, self._graph.serialize(*arg, **kw))
91 def patch(self, p): 105
106 def patch(self, p: Patch):
92 with PATCH_CALLS.labels(graph=self.label).time(): 107 with PATCH_CALLS.labels(graph=self.label).time():
93 # assuming no stmt is both in p.addQuads and p.delQuads. 108 # assuming no stmt is both in p.addQuads and p.delQuads.
94 dels = set([q for q in p.delQuads if inGraph(q, self._graph)]) 109 dels = set([q for q in p.delQuads if inGraph(q, self._graph)])
95 adds = set([q for q in p.addQuads if not inGraph(q, self._graph)]) 110 adds = set([q for q in p.addQuads if not inGraph(q, self._graph)])
96 minimizedP = Patch(addQuads=adds, delQuads=dels) 111 minimizedP = Patch(addQuads=adds, delQuads=dels)
97 if minimizedP.isNoop(): 112 if minimizedP.isNoop():
98 return 113 return
99 patchQuads(self._graph, 114 patchQuads(self._graph, deleteQuads=dels, addQuads=adds, perfect=False) # true?
100 deleteQuads=dels,
101 addQuads=adds,
102 perfect=False) # true?
103 for ob in self._observers: 115 for ob in self._observers:
104 ob(patchAsJson(p)) 116 ob(patchAsJson(p))
105 STATEMENT_COUNT.labels(graph=self.label).set(len(self._graph)) 117 STATEMENT_COUNT.labels(graph=self.label).set(len(self._graph))
106 118
107 def asJsonLd(self): 119 def asJsonLd(self) -> str:
108 return graphAsJson(self._graph) 120 return graphAsJson(self._graph)
109 121
110 def addObserver(self, onPatch): 122 def addObserver(self, onPatch: Callable[[str], None]):
111 self._observers.append(onPatch) 123 self._observers.append(onPatch)
112 OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._observers)) 124 OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._observers))
113 OBSERVERS_ADDED.labels(graph=self.label).inc() 125 OBSERVERS_ADDED.labels(graph=self.label).inc()
114 126
115 def removeObserver(self, onPatch): 127 def removeObserver(self, onPatch: Callable[[str], None]):
116 try: 128 try:
117 self._observers.remove(onPatch) 129 self._observers.remove(onPatch)
118 except ValueError: 130 except ValueError:
119 pass 131 pass
120 self._currentObservers = len(self._observers) 132 self._currentObservers = len(self._observers)
121 133
122 def setToGraph(self, newGraph): 134 def setToGraph(self, newGraph: ConjunctiveGraph):
123 self.patch(Patch.fromDiff(self._graph, newGraph)) 135 self.patch(Patch.fromDiff(self._graph, newGraph))
124 136
125 137
126 SEND_SIMPLE_GRAPH = Summary('send_simple_graph', 'calls to _writeGraphResponse') 138 SEND_SIMPLE_GRAPH = Summary('send_simple_graph', 'calls to _writeGraphResponse')
127 139
128 140
129 class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler): 141 class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler):
142
130 def initialize(self, masterGraph: PatchableGraph): 143 def initialize(self, masterGraph: PatchableGraph):
131 self.masterGraph = masterGraph 144 self.masterGraph = masterGraph
132 145
133 def get(self): 146 def get(self):
134 with SEND_SIMPLE_GRAPH.time(): 147 with SEND_SIMPLE_GRAPH.time():
225 238
226 intsead of turning off buffering all over, it may work for this 239 intsead of turning off buffering all over, it may work for this
227 response to send 'x-accel-buffering: no', per 240 response to send 'x-accel-buffering: no', per
228 http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering 241 http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering
229 """ 242 """
243
230 def __init__(self, application, request, masterGraph): 244 def __init__(self, application, request, masterGraph):
231 cyclone.sse.SSEHandler.__init__(self, application, request) 245 cyclone.sse.SSEHandler.__init__(self, application, request)
232 self.masterGraph = masterGraph 246 self.masterGraph = masterGraph
233 247
234 def bind(self): 248 def bind(self):