comparison patchablegraph.py @ 3:703adc4f78b1

scales -> promethewus
author drewp@bigasterisk.com
date Wed, 24 Nov 2021 19:47:06 -0800
parents c3f0a692c4cb
children dc4f852d0d70
comparison
equal deleted inserted replaced
2:a8a001175948 3:703adc4f78b1
20 differences between RDF graphs 20 differences between RDF graphs
21 21
22 """ 22 """
23 import json, logging, itertools, html 23 import json, logging, itertools, html
24 24
25 from greplin import scales 25 from prometheus_client import Counter, Gauge, Summary
26 from rdfdb.grapheditapi import GraphEditApi 26 from rdfdb.grapheditapi import GraphEditApi
27 from rdflib import ConjunctiveGraph 27 from rdflib import ConjunctiveGraph
28 from rdflib.namespace import NamespaceManager 28 from rdflib.namespace import NamespaceManager
29 from rdflib.parser import StringInputSource 29 from rdflib.parser import StringInputSource
30 from rdflib.plugins.serializers.jsonld import from_rdf 30 from rdflib.plugins.serializers.jsonld import from_rdf
33 from rdfdb.patch import Patch 33 from rdfdb.patch import Patch
34 from rdfdb.rdflibpatch import patchQuads, inGraph 34 from rdfdb.rdflibpatch import patchQuads, inGraph
35 35
36 log = logging.getLogger('patchablegraph') 36 log = logging.getLogger('patchablegraph')
37 37
38 SERIALIZE_CALLS = Summary('serialize_calls', 'PatchableGraph.serialize calls')
39 PATCH_CALLS = Summary('patch_calls', 'PatchableGraph.patch calls')
40 STATEMENT_COUNT = Gauge('statement_count', 'current PatchableGraph graph size')
41 OBSERVERS_CURRENT = Gauge('observers_current', 'current observer count')
42 OBSERVERS_ADDED = Counter('observers_added', 'observers added')
43
44
38 # forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py 45 # forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py
39 def _graphFromQuads2(q): 46 def _graphFromQuads2(q):
40 g = ConjunctiveGraph() 47 g = ConjunctiveGraph()
41 #g.addN(q) # no effect on nquad output 48 #g.addN(q) # no effect on nquad output
42 for s,p,o,c in q: 49 for s,p,o,c in q:
79 _serialize = scales.PmfStat('serialize') 86 _serialize = scales.PmfStat('serialize')
80 def serialize(self, *arg, **kw): 87 def serialize(self, *arg, **kw):
81 with self._serialize.time(): 88 with self._serialize.time():
82 return self._graph.serialize(*arg, **kw) 89 return self._graph.serialize(*arg, **kw)
83 90
84 _patch = scales.PmfStat('patch')
85 _len = scales.IntStat('statementCount')
86 def patch(self, p): 91 def patch(self, p):
87 with self._patch.time(): 92 with PATCH_CALLS.labels(graph=self.label).time():
88 # assuming no stmt is both in p.addQuads and p.delQuads. 93 # assuming no stmt is both in p.addQuads and p.delQuads.
89 dels = set([q for q in p.delQuads if inGraph(q, self._graph)]) 94 dels = set([q for q in p.delQuads if inGraph(q, self._graph)])
90 adds = set([q for q in p.addQuads if not inGraph(q, self._graph)]) 95 adds = set([q for q in p.addQuads if not inGraph(q, self._graph)])
91 minimizedP = Patch(addQuads=adds, delQuads=dels) 96 minimizedP = Patch(addQuads=adds, delQuads=dels)
92 if minimizedP.isNoop(): 97 if minimizedP.isNoop():
95 deleteQuads=dels, 100 deleteQuads=dels,
96 addQuads=adds, 101 addQuads=adds,
97 perfect=False) # true? 102 perfect=False) # true?
98 for ob in self._observers: 103 for ob in self._observers:
99 ob(patchAsJson(p)) 104 ob(patchAsJson(p))
100 self._len = len(self._graph) 105 STATEMENT_COUNT.labels(graph=self.label).set(len(self._graph))
101 106
102 def asJsonLd(self): 107 def asJsonLd(self):
103 return graphAsJson(self._graph) 108 return graphAsJson(self._graph)
104 109
105 _currentObservers = scales.IntStat('observers/current')
106 _observersAdded = scales.IntStat('observers/added')
107 def addObserver(self, onPatch): 110 def addObserver(self, onPatch):
108 self._observers.append(onPatch) 111 self._observers.append(onPatch)
109 self._currentObservers = len(self._observers) 112 OBSERVERS_CURRENT.labels(graph=self.label).set(len(self._observers))
110 self._observersAdded += 1 113 OBSERVERS_ADDED.labels(graph=self.label).inc()
111 114
112 def removeObserver(self, onPatch): 115 def removeObserver(self, onPatch):
113 try: 116 try:
114 self._observers.remove(onPatch) 117 self._observers.remove(onPatch)
115 except ValueError: 118 except ValueError:
117 self._currentObservers = len(self._observers) 120 self._currentObservers = len(self._observers)
118 121
119 def setToGraph(self, newGraph): 122 def setToGraph(self, newGraph):
120 self.patch(Patch.fromDiff(self._graph, newGraph)) 123 self.patch(Patch.fromDiff(self._graph, newGraph))
121 124
122 _sendSimpleGraph = scales.PmfStat('serve/simpleGraph') 125
123 _sendFullGraph = scales.PmfStat('serve/events/sendFull') 126 SEND_SIMPLE_GRAPH = Summary('send_simple_graph', 'calls to _writeGraphResponse')
124 _sendPatch = scales.PmfStat('serve/events/sendPatch')
125 127
126 128
127 class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler): 129 class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler):
128 def initialize(self, masterGraph: PatchableGraph): 130 def initialize(self, masterGraph: PatchableGraph):
129 self.masterGraph = masterGraph 131 self.masterGraph = masterGraph
130 132
131 def get(self): 133 def get(self):
132 with self.masterGraph._sendSimpleGraph.time(): 134 with SEND_SIMPLE_GRAPH.time():
133 self._writeGraphResponse() 135 self._writeGraphResponse()
134 136
135 def _writeGraphResponse(self): 137 def _writeGraphResponse(self):
136 acceptHeader = self.request.headers.get( 138 acceptHeader = self.request.headers.get(
137 'Accept', 139 'Accept',
208 </script> 210 </script>
209 </body></html> 211 </body></html>
210 ''') 212 ''')
211 213
212 214
215 SEND_FULL_GRAPH = Summary('send_full_graph', 'fullGraph SSE events')
216 SEND_PATCH = Summary('send_patch', 'patch SSE events')
217
218
213 class CycloneGraphEventsHandler(cyclone.sse.SSEHandler): 219 class CycloneGraphEventsHandler(cyclone.sse.SSEHandler):
214 """ 220 """
215 One session with one client. 221 One session with one client.
216 222
217 returns current graph plus future patches to keep remote version 223 returns current graph plus future patches to keep remote version
224 def __init__(self, application, request, masterGraph): 230 def __init__(self, application, request, masterGraph):
225 cyclone.sse.SSEHandler.__init__(self, application, request) 231 cyclone.sse.SSEHandler.__init__(self, application, request)
226 self.masterGraph = masterGraph 232 self.masterGraph = masterGraph
227 233
228 def bind(self): 234 def bind(self):
229 with self.masterGraph._sendFullGraph.time(): 235 with SEND_FULL_GRAPH.time():
230 graphJson = self.masterGraph.asJsonLd() 236 graphJson = self.masterGraph.asJsonLd()
231 log.debug("send fullGraph event: %s", graphJson) 237 log.debug("send fullGraph event: %s", graphJson)
232 self.sendEvent(message=graphJson, event=b'fullGraph') 238 self.sendEvent(message=graphJson, event=b'fullGraph')
233 self.masterGraph.addObserver(self.onPatch) 239 self.masterGraph.addObserver(self.onPatch)
234 240
235 def onPatch(self, patchJson): 241 def onPatch(self, patchJson):
236 with self.masterGraph._sendPatch.time(): 242 with SEND_PATCH.time():
237 # throttle and combine patches here- ideally we could see how 243 # throttle and combine patches here- ideally we could see how
238 # long the latency to the client is to make a better rate choice 244 # long the latency to the client is to make a better rate choice
239 self.sendEvent(message=patchJson, event=b'patch') 245 self.sendEvent(message=patchJson, event=b'patch')
240 246
241 def unbind(self): 247 def unbind(self):