Mercurial > code > home > repos > patchablegraph
comparison patchablegraph.py @ 3:703adc4f78b1
scales -> promethewus
author | drewp@bigasterisk.com |
---|---|
date | Wed, 24 Nov 2021 19:47:06 -0800 |
parents | c3f0a692c4cb |
children | dc4f852d0d70 |
comparison
equal
deleted
inserted
replaced
2:a8a001175948 | 3:703adc4f78b1 |
---|---|
20 differences between RDF graphs | 20 differences between RDF graphs |
21 | 21 |
22 """ | 22 """ |
23 import json, logging, itertools, html | 23 import json, logging, itertools, html |
24 | 24 |
25 from greplin import scales | 25 from prometheus_client import Counter, Gauge, Summary |
26 from rdfdb.grapheditapi import GraphEditApi | 26 from rdfdb.grapheditapi import GraphEditApi |
27 from rdflib import ConjunctiveGraph | 27 from rdflib import ConjunctiveGraph |
28 from rdflib.namespace import NamespaceManager | 28 from rdflib.namespace import NamespaceManager |
29 from rdflib.parser import StringInputSource | 29 from rdflib.parser import StringInputSource |
30 from rdflib.plugins.serializers.jsonld import from_rdf | 30 from rdflib.plugins.serializers.jsonld import from_rdf |
33 from rdfdb.patch import Patch | 33 from rdfdb.patch import Patch |
34 from rdfdb.rdflibpatch import patchQuads, inGraph | 34 from rdfdb.rdflibpatch import patchQuads, inGraph |
35 | 35 |
36 log = logging.getLogger('patchablegraph') | 36 log = logging.getLogger('patchablegraph') |
37 | 37 |
38 SERIALIZE_CALLS = Summary('serialize_calls', 'PatchableGraph.serialize calls') | |
39 PATCH_CALLS = Summary('patch_calls', 'PatchableGraph.patch calls') | |
40 STATEMENT_COUNT = Gauge('statement_count', 'current PatchableGraph graph size') | |
41 OBSERVERS_CURRENT = Gauge('observers_current', 'current observer count') | |
42 OBSERVERS_ADDED = Counter('observers_added', 'observers added') | |
43 | |
44 | |
38 # forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py | 45 # forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py |
39 def _graphFromQuads2(q): | 46 def _graphFromQuads2(q): |
40 g = ConjunctiveGraph() | 47 g = ConjunctiveGraph() |
41 #g.addN(q) # no effect on nquad output | 48 #g.addN(q) # no effect on nquad output |
42 for s,p,o,c in q: | 49 for s,p,o,c in q: |
79 _serialize = scales.PmfStat('serialize') | 86 _serialize = scales.PmfStat('serialize') |
80 def serialize(self, *arg, **kw): | 87 def serialize(self, *arg, **kw): |
81 with self._serialize.time(): | 88 with self._serialize.time(): |
82 return self._graph.serialize(*arg, **kw) | 89 return self._graph.serialize(*arg, **kw) |
83 | 90 |
84 _patch = scales.PmfStat('patch') | |
85 _len = scales.IntStat('statementCount') | |
86 def patch(self, p): | 91 def patch(self, p): |
87 with self._patch.time(): | 92 with PATCH_CALLS.labels(graph=self.label).time(): |
88 # assuming no stmt is both in p.addQuads and p.delQuads. | 93 # assuming no stmt is both in p.addQuads and p.delQuads. |
89 dels = set([q for q in p.delQuads if inGraph(q, self._graph)]) | 94 dels = set([q for q in p.delQuads if inGraph(q, self._graph)]) |
90 adds = set([q for q in p.addQuads if not inGraph(q, self._graph)]) | 95 adds = set([q for q in p.addQuads if not inGraph(q, self._graph)]) |
91 minimizedP = Patch(addQuads=adds, delQuads=dels) | 96 minimizedP = Patch(addQuads=adds, delQuads=dels) |
92 if minimizedP.isNoop(): | 97 if minimizedP.isNoop(): |
95 deleteQuads=dels, | 100 deleteQuads=dels, |
96 addQuads=adds, | 101 addQuads=adds, |
97 perfect=False) # true? | 102 perfect=False) # true? |
98 for ob in self._observers: | 103 for ob in self._observers: |
99 ob(patchAsJson(p)) | 104 ob(patchAsJson(p)) |
100 self._len = len(self._graph) | 105 STATEMENT_COUNT.labels(graph=self.label).set(len(self._graph)) |
101 | 106 |
102 def asJsonLd(self): | 107 def asJsonLd(self): |
103 return graphAsJson(self._graph) | 108 return graphAsJson(self._graph) |
104 | 109 |
105 _currentObservers = scales.IntStat('observers/current') | |
106 _observersAdded = scales.IntStat('observers/added') | |
107 def addObserver(self, onPatch): | 110 def addObserver(self, onPatch): |
108 self._observers.append(onPatch) | 111 self._observers.append(onPatch) |
109 self._currentObservers = len(self._observers) | 112 OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._observers)) |
110 self._observersAdded += 1 | 113 OBSERVERS_ADDED.labels(graph=self.label).inc() |
111 | 114 |
112 def removeObserver(self, onPatch): | 115 def removeObserver(self, onPatch): |
113 try: | 116 try: |
114 self._observers.remove(onPatch) | 117 self._observers.remove(onPatch) |
115 except ValueError: | 118 except ValueError: |
117 self._currentObservers = len(self._observers) | 120 self._currentObservers = len(self._observers) |
118 | 121 |
119 def setToGraph(self, newGraph): | 122 def setToGraph(self, newGraph): |
120 self.patch(Patch.fromDiff(self._graph, newGraph)) | 123 self.patch(Patch.fromDiff(self._graph, newGraph)) |
121 | 124 |
122 _sendSimpleGraph = scales.PmfStat('serve/simpleGraph') | 125 |
123 _sendFullGraph = scales.PmfStat('serve/events/sendFull') | 126 SEND_SIMPLE_GRAPH = Summary('send_simple_graph', 'calls to _writeGraphResponse') |
124 _sendPatch = scales.PmfStat('serve/events/sendPatch') | |
125 | 127 |
126 | 128 |
127 class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler): | 129 class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler): |
128 def initialize(self, masterGraph: PatchableGraph): | 130 def initialize(self, masterGraph: PatchableGraph): |
129 self.masterGraph = masterGraph | 131 self.masterGraph = masterGraph |
130 | 132 |
131 def get(self): | 133 def get(self): |
132 with self.masterGraph._sendSimpleGraph.time(): | 134 with SEND_SIMPLE_GRAPH.time(): |
133 self._writeGraphResponse() | 135 self._writeGraphResponse() |
134 | 136 |
135 def _writeGraphResponse(self): | 137 def _writeGraphResponse(self): |
136 acceptHeader = self.request.headers.get( | 138 acceptHeader = self.request.headers.get( |
137 'Accept', | 139 'Accept', |
208 </script> | 210 </script> |
209 </body></html> | 211 </body></html> |
210 ''') | 212 ''') |
211 | 213 |
212 | 214 |
215 SEND_FULL_GRAPH = Summary('send_full_graph', 'fullGraph SSE events') | |
216 SEND_PATCH = Summary('send_patch', 'patch SSE events') | |
217 | |
218 | |
213 class CycloneGraphEventsHandler(cyclone.sse.SSEHandler): | 219 class CycloneGraphEventsHandler(cyclone.sse.SSEHandler): |
214 """ | 220 """ |
215 One session with one client. | 221 One session with one client. |
216 | 222 |
217 returns current graph plus future patches to keep remote version | 223 returns current graph plus future patches to keep remote version |
224 def __init__(self, application, request, masterGraph): | 230 def __init__(self, application, request, masterGraph): |
225 cyclone.sse.SSEHandler.__init__(self, application, request) | 231 cyclone.sse.SSEHandler.__init__(self, application, request) |
226 self.masterGraph = masterGraph | 232 self.masterGraph = masterGraph |
227 | 233 |
228 def bind(self): | 234 def bind(self): |
229 with self.masterGraph._sendFullGraph.time(): | 235 with SEND_FULL_GRAPH.time(): |
230 graphJson = self.masterGraph.asJsonLd() | 236 graphJson = self.masterGraph.asJsonLd() |
231 log.debug("send fullGraph event: %s", graphJson) | 237 log.debug("send fullGraph event: %s", graphJson) |
232 self.sendEvent(message=graphJson, event=b'fullGraph') | 238 self.sendEvent(message=graphJson, event=b'fullGraph') |
233 self.masterGraph.addObserver(self.onPatch) | 239 self.masterGraph.addObserver(self.onPatch) |
234 | 240 |
235 def onPatch(self, patchJson): | 241 def onPatch(self, patchJson): |
236 with self.masterGraph._sendPatch.time(): | 242 with SEND_PATCH.time(): |
237 # throttle and combine patches here- ideally we could see how | 243 # throttle and combine patches here- ideally we could see how |
238 # long the latency to the client is to make a better rate choice | 244 # long the latency to the client is to make a better rate choice |
239 self.sendEvent(message=patchJson, event=b'patch') | 245 self.sendEvent(message=patchJson, event=b'patch') |
240 | 246 |
241 def unbind(self): | 247 def unbind(self): |