comparison lib/patchablegraph.py @ 473:388769b5f8ff

stats support and maybe a no-op filtering logic change snuck in there Ignore-this: d54125308243159b28ef11e2d09014f4
author drewp@bigasterisk.com
date Sat, 20 Apr 2019 23:51:02 -0700
parents fcd2c026f51e
children 1d2817cb9a6f
comparison
equal deleted inserted replaced
472:a63549a50b3f 473:388769b5f8ff
18 * https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF 18 * https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF
19 * https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of 19 * https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of
20 differences between RDF graphs 20 differences between RDF graphs
21 21
22 """ 22 """
23 import sys, json, logging 23 import sys, json, logging, itertools
24 import cyclone.sse 24 import cyclone.sse
25 sys.path.append("/my/proj/rdfdb") 25 sys.path.append("/my/proj/rdfdb")
26 from rdfdb.grapheditapi import GraphEditApi 26 from rdfdb.grapheditapi import GraphEditApi
27 from rdflib import ConjunctiveGraph 27 from rdflib import ConjunctiveGraph
28 from rdfdb.rdflibpatch import patchQuads 28 from rdfdb.rdflibpatch import patchQuads
29 from rdfdb.patch import Patch 29 from rdfdb.patch import Patch
30 from rdflib_jsonld.serializer import from_rdf 30 from rdflib_jsonld.serializer import from_rdf
31 from rdflib.parser import StringInputSource 31 from rdflib.parser import StringInputSource
32 from cycloneerr import PrettyErrorHandler 32 from cycloneerr import PrettyErrorHandler
33 from greplin import scales
33 34
34 log = logging.getLogger('patchablegraph') 35 log = logging.getLogger('patchablegraph')
35 36
36 def writeGraphResponse(req, graph, acceptHeader): 37 def writeGraphResponse(req, graph, acceptHeader):
37 if acceptHeader == 'application/nquads': 38 if acceptHeader == 'application/nquads':
71 72
72 def graphAsJson(g): 73 def graphAsJson(g):
73 # This is not the same as g.serialize(format='json-ld')! That 74 # This is not the same as g.serialize(format='json-ld')! That
74 # version omits literal datatypes. 75 # version omits literal datatypes.
75 return json.dumps(from_rdf(g)) 76 return json.dumps(from_rdf(g))
76 77
78 _graphsInProcess = itertools.count()
77 class PatchableGraph(GraphEditApi): 79 class PatchableGraph(GraphEditApi):
78 """ 80 """
79 Master graph that you modify with self.patch, and we get the 81 Master graph that you modify with self.patch, and we get the
80 updates to all current listeners. 82 updates to all current listeners.
81 """ 83 """
82 def __init__(self): 84 def __init__(self):
83 self._graph = ConjunctiveGraph() 85 self._graph = ConjunctiveGraph()
84 self._observers = [] 86 self._observers = []
87 scales.init(self, '/patchableGraph%s' % next(_graphsInProcess))
85 88
89 _serialize = scales.PmfStat('serialize')
86 def serialize(self, to, **kw): 90 def serialize(self, to, **kw):
87 return self._graph.serialize(to, **kw) 91 with self._serialize.time():
88 92 return self._graph.serialize(to, **kw)
93
94 _patch = scales.PmfStat('patch')
95 _len = scales.IntStat('statementCount')
89 def patch(self, p): 96 def patch(self, p):
90 if p.isNoop(): 97 with self._patch.time():
91 return 98 # assuming no stmt is both in p.addQuads and p.delQuads.
92 patchQuads(self._graph, 99 dels = set([q for q in p.delQuads if inGraph(q, self._graph)])
93 deleteQuads=p.delQuads, 100 adds = set([q for q in p.addQuads if not inGraph(q, self._graph)])
94 addQuads=p.addQuads, 101 minimizedP = Patch(addQuads=adds, delQuads=dels)
95 perfect=False) # true? 102 if minimizedP.isNoop():
96 for ob in self._observers: 103 return
97 ob(patchAsJson(p)) 104 patchQuads(self._graph,
105 deleteQuads=dels,
106 addQuads=adds,
107 perfect=False) # true?
108 for ob in self._observers:
109 ob(patchAsJson(p))
110 self._len = len(self._graph)
98 111
99 def asJsonLd(self): 112 def asJsonLd(self):
100 return graphAsJson(self._graph) 113 return graphAsJson(self._graph)
101 114
115 _currentObservers = scales.IntStat('observers/current')
116 _observersAdded = scales.IntStat('observers/added')
102 def addObserver(self, onPatch): 117 def addObserver(self, onPatch):
103 self._observers.append(onPatch) 118 self._observers.append(onPatch)
119 self._currentObservers = len(self._observers)
120 self._observersAdded += 1
104 121
105 def removeObserver(self, onPatch): 122 def removeObserver(self, onPatch):
106 try: 123 try:
107 self._observers.remove(onPatch) 124 self._observers.remove(onPatch)
108 except ValueError: 125 except ValueError:
109 pass 126 pass
127 self._currentObservers = len(self._observers)
110 128
111 def setToGraph(self, newGraph): 129 def setToGraph(self, newGraph):
112 self.patch(Patch.fromDiff(self._graph, newGraph)) 130 self.patch(Patch.fromDiff(self._graph, newGraph))
113 131
132 _sendSimpleGraph = scales.PmfStat('serve/simpleGraph')
133 _sendFullGraph = scales.PmfStat('serve/events/sendFull')
134 _sendPatch = scales.PmfStat('serve/events/sendPatch')
114 135
115 class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler): 136 class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler):
116 def initialize(self, masterGraph): 137 def initialize(self, masterGraph):
117 self.masterGraph = masterGraph 138 self.masterGraph = masterGraph
118 139
119 def get(self): 140 def get(self):
120 writeGraphResponse(self, self.masterGraph, 141 with self.masterGraph._sendSimpleGraph.time():
121 self.request.headers.get('accept')) 142 writeGraphResponse(self, self.masterGraph,
122 143 self.request.headers.get('accept'))
144
145
123 class CycloneGraphEventsHandler(cyclone.sse.SSEHandler): 146 class CycloneGraphEventsHandler(cyclone.sse.SSEHandler):
124 """ 147 """
125 One session with one client. 148 One session with one client.
126 149
127 returns current graph plus future patches to keep remote version 150 returns current graph plus future patches to keep remote version
132 http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering 155 http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering
133 """ 156 """
134 def __init__(self, application, request, masterGraph): 157 def __init__(self, application, request, masterGraph):
135 cyclone.sse.SSEHandler.__init__(self, application, request) 158 cyclone.sse.SSEHandler.__init__(self, application, request)
136 self.masterGraph = masterGraph 159 self.masterGraph = masterGraph
137 160
138 def bind(self): 161 def bind(self):
139 graphJson = self.masterGraph.asJsonLd() 162 with self.masterGraph._sendFullGraph.time():
140 log.debug("send fullGraph event: %s", graphJson) 163 graphJson = self.masterGraph.asJsonLd()
141 self.sendEvent(message=graphJson, event=b'fullGraph') 164 log.debug("send fullGraph event: %s", graphJson)
142 self.masterGraph.addObserver(self.onPatch) 165 self.sendEvent(message=graphJson, event=b'fullGraph')
166 self.masterGraph.addObserver(self.onPatch)
143 167
144 def onPatch(self, patchJson): 168 def onPatch(self, patchJson):
145 # throttle and combine patches here- ideally we could see how 169 with self.masterGraph._sendPatch.time():
146 # long the latency to the client is to make a better rate choice 170 # throttle and combine patches here- ideally we could see how
147 self.sendEvent(message=patchJson, event=b'patch') 171 # long the latency to the client is to make a better rate choice
172 self.sendEvent(message=patchJson, event=b'patch')
148 173
149 def unbind(self): 174 def unbind(self):
150 self.masterGraph.removeObserver(self.onPatch) 175 self.masterGraph.removeObserver(self.onPatch)
151 176