Mercurial > code > home > repos > homeauto
comparison lib/patchablegraph.py @ 473:388769b5f8ff
stats support and maybe a no-op filtering logic change snuck in there
Ignore-this: d54125308243159b28ef11e2d09014f4
author | drewp@bigasterisk.com |
---|---|
date | Sat, 20 Apr 2019 23:51:02 -0700 |
parents | fcd2c026f51e |
children | 1d2817cb9a6f |
comparison
equal
deleted
inserted
replaced
472:a63549a50b3f | 473:388769b5f8ff |
---|---|
18 * https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF | 18 * https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF |
19 * https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of | 19 * https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of |
20 differences between RDF graphs | 20 differences between RDF graphs |
21 | 21 |
22 """ | 22 """ |
23 import sys, json, logging | 23 import sys, json, logging, itertools |
24 import cyclone.sse | 24 import cyclone.sse |
25 sys.path.append("/my/proj/rdfdb") | 25 sys.path.append("/my/proj/rdfdb") |
26 from rdfdb.grapheditapi import GraphEditApi | 26 from rdfdb.grapheditapi import GraphEditApi |
27 from rdflib import ConjunctiveGraph | 27 from rdflib import ConjunctiveGraph |
28 from rdfdb.rdflibpatch import patchQuads | 28 from rdfdb.rdflibpatch import patchQuads |
29 from rdfdb.patch import Patch | 29 from rdfdb.patch import Patch |
30 from rdflib_jsonld.serializer import from_rdf | 30 from rdflib_jsonld.serializer import from_rdf |
31 from rdflib.parser import StringInputSource | 31 from rdflib.parser import StringInputSource |
32 from cycloneerr import PrettyErrorHandler | 32 from cycloneerr import PrettyErrorHandler |
33 from greplin import scales | |
33 | 34 |
34 log = logging.getLogger('patchablegraph') | 35 log = logging.getLogger('patchablegraph') |
35 | 36 |
36 def writeGraphResponse(req, graph, acceptHeader): | 37 def writeGraphResponse(req, graph, acceptHeader): |
37 if acceptHeader == 'application/nquads': | 38 if acceptHeader == 'application/nquads': |
71 | 72 |
72 def graphAsJson(g): | 73 def graphAsJson(g): |
73 # This is not the same as g.serialize(format='json-ld')! That | 74 # This is not the same as g.serialize(format='json-ld')! That |
74 # version omits literal datatypes. | 75 # version omits literal datatypes. |
75 return json.dumps(from_rdf(g)) | 76 return json.dumps(from_rdf(g)) |
76 | 77 |
78 _graphsInProcess = itertools.count() | |
77 class PatchableGraph(GraphEditApi): | 79 class PatchableGraph(GraphEditApi): |
78 """ | 80 """ |
79 Master graph that you modify with self.patch, and we get the | 81 Master graph that you modify with self.patch, and we get the |
80 updates to all current listeners. | 82 updates to all current listeners. |
81 """ | 83 """ |
82 def __init__(self): | 84 def __init__(self): |
83 self._graph = ConjunctiveGraph() | 85 self._graph = ConjunctiveGraph() |
84 self._observers = [] | 86 self._observers = [] |
87 scales.init(self, '/patchableGraph%s' % next(_graphsInProcess)) | |
85 | 88 |
89 _serialize = scales.PmfStat('serialize') | |
86 def serialize(self, to, **kw): | 90 def serialize(self, to, **kw): |
87 return self._graph.serialize(to, **kw) | 91 with self._serialize.time(): |
88 | 92 return self._graph.serialize(to, **kw) |
93 | |
94 _patch = scales.PmfStat('patch') | |
95 _len = scales.IntStat('statementCount') | |
89 def patch(self, p): | 96 def patch(self, p): |
90 if p.isNoop(): | 97 with self._patch.time(): |
91 return | 98 # assuming no stmt is both in p.addQuads and p.delQuads. |
92 patchQuads(self._graph, | 99 dels = set([q for q in p.delQuads if inGraph(q, self._graph)]) |
93 deleteQuads=p.delQuads, | 100 adds = set([q for q in p.addQuads if not inGraph(q, self._graph)]) |
94 addQuads=p.addQuads, | 101 minimizedP = Patch(addQuads=adds, delQuads=dels) |
95 perfect=False) # true? | 102 if minimizedP.isNoop(): |
96 for ob in self._observers: | 103 return |
97 ob(patchAsJson(p)) | 104 patchQuads(self._graph, |
105 deleteQuads=dels, | |
106 addQuads=adds, | |
107 perfect=False) # true? | |
108 for ob in self._observers: | |
109 ob(patchAsJson(p)) | |
110 self._len = len(self._graph) | |
98 | 111 |
99 def asJsonLd(self): | 112 def asJsonLd(self): |
100 return graphAsJson(self._graph) | 113 return graphAsJson(self._graph) |
101 | 114 |
115 _currentObservers = scales.IntStat('observers/current') | |
116 _observersAdded = scales.IntStat('observers/added') | |
102 def addObserver(self, onPatch): | 117 def addObserver(self, onPatch): |
103 self._observers.append(onPatch) | 118 self._observers.append(onPatch) |
119 self._currentObservers = len(self._observers) | |
120 self._observersAdded += 1 | |
104 | 121 |
105 def removeObserver(self, onPatch): | 122 def removeObserver(self, onPatch): |
106 try: | 123 try: |
107 self._observers.remove(onPatch) | 124 self._observers.remove(onPatch) |
108 except ValueError: | 125 except ValueError: |
109 pass | 126 pass |
127 self._currentObservers = len(self._observers) | |
110 | 128 |
111 def setToGraph(self, newGraph): | 129 def setToGraph(self, newGraph): |
112 self.patch(Patch.fromDiff(self._graph, newGraph)) | 130 self.patch(Patch.fromDiff(self._graph, newGraph)) |
113 | 131 |
132 _sendSimpleGraph = scales.PmfStat('serve/simpleGraph') | |
133 _sendFullGraph = scales.PmfStat('serve/events/sendFull') | |
134 _sendPatch = scales.PmfStat('serve/events/sendPatch') | |
114 | 135 |
115 class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler): | 136 class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler): |
116 def initialize(self, masterGraph): | 137 def initialize(self, masterGraph): |
117 self.masterGraph = masterGraph | 138 self.masterGraph = masterGraph |
118 | 139 |
119 def get(self): | 140 def get(self): |
120 writeGraphResponse(self, self.masterGraph, | 141 with self.masterGraph._sendSimpleGraph.time(): |
121 self.request.headers.get('accept')) | 142 writeGraphResponse(self, self.masterGraph, |
122 | 143 self.request.headers.get('accept')) |
144 | |
145 | |
123 class CycloneGraphEventsHandler(cyclone.sse.SSEHandler): | 146 class CycloneGraphEventsHandler(cyclone.sse.SSEHandler): |
124 """ | 147 """ |
125 One session with one client. | 148 One session with one client. |
126 | 149 |
127 returns current graph plus future patches to keep remote version | 150 returns current graph plus future patches to keep remote version |
132 http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering | 155 http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering |
133 """ | 156 """ |
134 def __init__(self, application, request, masterGraph): | 157 def __init__(self, application, request, masterGraph): |
135 cyclone.sse.SSEHandler.__init__(self, application, request) | 158 cyclone.sse.SSEHandler.__init__(self, application, request) |
136 self.masterGraph = masterGraph | 159 self.masterGraph = masterGraph |
137 | 160 |
138 def bind(self): | 161 def bind(self): |
139 graphJson = self.masterGraph.asJsonLd() | 162 with self.masterGraph._sendFullGraph.time(): |
140 log.debug("send fullGraph event: %s", graphJson) | 163 graphJson = self.masterGraph.asJsonLd() |
141 self.sendEvent(message=graphJson, event=b'fullGraph') | 164 log.debug("send fullGraph event: %s", graphJson) |
142 self.masterGraph.addObserver(self.onPatch) | 165 self.sendEvent(message=graphJson, event=b'fullGraph') |
166 self.masterGraph.addObserver(self.onPatch) | |
143 | 167 |
144 def onPatch(self, patchJson): | 168 def onPatch(self, patchJson): |
145 # throttle and combine patches here- ideally we could see how | 169 with self.masterGraph._sendPatch.time(): |
146 # long the latency to the client is to make a better rate choice | 170 # throttle and combine patches here- ideally we could see how |
147 self.sendEvent(message=patchJson, event=b'patch') | 171 # long the latency to the client is to make a better rate choice |
172 self.sendEvent(message=patchJson, event=b'patch') | |
148 | 173 |
149 def unbind(self): | 174 def unbind(self): |
150 self.masterGraph.removeObserver(self.onPatch) | 175 self.masterGraph.removeObserver(self.onPatch) |
151 | 176 |