diff lib/patchablegraph/patchablegraph.py @ 514:495f573af4f4

make patchablegraph release Ignore-this: f55c9a56b052797ff23a80630714b51a
author drewp@bigasterisk.com
date Mon, 22 Apr 2019 23:29:19 -0700
parents lib/patchablegraph.py@1d2817cb9a6f
children 83ccc9ba90ea
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/lib/patchablegraph/patchablegraph.py	Mon Apr 22 23:29:19 2019 -0700
@@ -0,0 +1,177 @@
+"""
+Design:
+
+1. Services each have (named) graphs, which they patch as things
+   change. PatchableGraph is an object for holding this graph.
+2. You can http GET that graph, or ...
+3. You can http GET/SSE that graph and hear about modifications to it
+4. The client that got the graph holds and maintains a copy. The
+   client may merge together multiple graphs.
+5. Client queries its graph with low-level APIs or client-side sparql.
+6. When the graph changes, the client knows and can update itself at
+   low or high granularity.
+
+
+See also:
+* http://iswc2007.semanticweb.org/papers/533.pdf RDFSync: efficient remote synchronization of RDF
+models
+* https://www.w3.org/2009/12/rdf-ws/papers/ws07 Supporting Change Propagation in RDF
+* https://www.w3.org/DesignIssues/lncs04/Diff.pdf Delta: an ontology for the distribution of
+differences between RDF graphs
+
+"""
+import json, logging, itertools
+
+from greplin import scales
+from rdfdb.grapheditapi import GraphEditApi
+from rdflib import ConjunctiveGraph
+from rdflib.parser import StringInputSource
+from rdflib_jsonld.serializer import from_rdf
+import cyclone.sse
+
+from cycloneerr import PrettyErrorHandler
+from rdfdb.patch import Patch
+from rdfdb.rdflibpatch import patchQuads, inGraph
+
+log = logging.getLogger('patchablegraph')
+
+def writeGraphResponse(req, graph, acceptHeader):
+    if acceptHeader == 'application/nquads':
+        req.set_header('Content-type', 'application/nquads')
+        graph.serialize(req, format='nquads')
+    elif acceptHeader == 'application/ld+json':
+        req.set_header('Content-type', 'application/ld+json')
+        graph.serialize(req, format='json-ld', indent=2)
+    else:
+        req.set_header('Content-type', 'application/x-trig')
+        graph.serialize(req, format='trig')
+
+# forked from /my/proj/light9/light9/rdfdb/rdflibpatch.py
+def _graphFromQuads2(q):
+    g = ConjunctiveGraph()
+    #g.addN(q) # no effect on nquad output
+    for s,p,o,c in q:
+        g.get_context(c).add((s,p,o)) # kind of works with broken rdflib nquad serializer code
+        #g.store.add((s,p,o), c) # no effect on nquad output
+    return g
+
+def jsonFromPatch(p):
+    return json.dumps({'patch': {
+        'adds': from_rdf(_graphFromQuads2(p.addQuads)),
+        'deletes': from_rdf(_graphFromQuads2(p.delQuads)),
+    }})
+patchAsJson = jsonFromPatch # deprecated name
+
+    
+def patchFromJson(j):
+    body = json.loads(j)['patch']
+    a = ConjunctiveGraph()
+    a.parse(StringInputSource(json.dumps(body['adds']).encode('utf8')), format='json-ld')
+    d = ConjunctiveGraph()
+    d.parse(StringInputSource(json.dumps(body['deletes']).encode('utf8')), format='json-ld')
+    return Patch(addGraph=a, delGraph=d)
+
+def graphAsJson(g):
+    # This is not the same as g.serialize(format='json-ld')! That
+    # version omits literal datatypes.
+    return json.dumps(from_rdf(g))
+
+_graphsInProcess = itertools.count()
+class PatchableGraph(GraphEditApi):
+    """
+    Master graph that you modify with self.patch, and we get the
+    updates to all current listeners.
+    """
+    def __init__(self):
+        self._graph = ConjunctiveGraph()
+        self._observers = []
+        scales.init(self, '/patchableGraph%s' % next(_graphsInProcess))
+
+    _serialize = scales.PmfStat('serialize')
+    def serialize(self, to, **kw):
+        with self._serialize.time():
+            return self._graph.serialize(to, **kw)
+
+    _patch = scales.PmfStat('patch')
+    _len = scales.IntStat('statementCount')
+    def patch(self, p):
+        with self._patch.time():
+            # assuming no stmt is both in p.addQuads and p.delQuads.
+            dels = set([q for q in p.delQuads if inGraph(q, self._graph)])
+            adds = set([q for q in p.addQuads if not inGraph(q, self._graph)])
+            minimizedP = Patch(addQuads=adds, delQuads=dels)
+            if minimizedP.isNoop():
+                return
+            patchQuads(self._graph,
+                       deleteQuads=dels,
+                       addQuads=adds,
+                       perfect=False) # true?
+            for ob in self._observers:
+                ob(patchAsJson(p))
+            self._len = len(self._graph)
+
+    def asJsonLd(self):
+        return graphAsJson(self._graph)
+
+    _currentObservers = scales.IntStat('observers/current')
+    _observersAdded = scales.IntStat('observers/added')
+    def addObserver(self, onPatch):
+        self._observers.append(onPatch)
+        self._currentObservers = len(self._observers)
+        self._observersAdded += 1
+        
+    def removeObserver(self, onPatch):
+        try:
+            self._observers.remove(onPatch)
+        except ValueError:
+            pass
+        self._currentObservers = len(self._observers)
+
+    def setToGraph(self, newGraph):
+        self.patch(Patch.fromDiff(self._graph, newGraph))
+        
+    _sendSimpleGraph = scales.PmfStat('serve/simpleGraph')
+    _sendFullGraph = scales.PmfStat('serve/events/sendFull')
+    _sendPatch = scales.PmfStat('serve/events/sendPatch')
+
+class CycloneGraphHandler(PrettyErrorHandler, cyclone.web.RequestHandler):
+    def initialize(self, masterGraph):
+        self.masterGraph = masterGraph
+        
+    def get(self):
+        with self.masterGraph._sendSimpleGraph.time():
+            writeGraphResponse(self, self.masterGraph,
+                               self.request.headers.get('accept'))
+
+
+class CycloneGraphEventsHandler(cyclone.sse.SSEHandler):
+    """
+    One session with one client.
+    
+    returns current graph plus future patches to keep remote version
+    in sync with ours.
+
+    intsead of turning off buffering all over, it may work for this
+    response to send 'x-accel-buffering: no', per
+    http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering
+    """
+    def __init__(self, application, request, masterGraph):
+        cyclone.sse.SSEHandler.__init__(self, application, request)
+        self.masterGraph = masterGraph
+
+    def bind(self):
+        with self.masterGraph._sendFullGraph.time():
+            graphJson = self.masterGraph.asJsonLd()
+            log.debug("send fullGraph event: %s", graphJson)
+            self.sendEvent(message=graphJson, event=b'fullGraph')
+            self.masterGraph.addObserver(self.onPatch)
+
+    def onPatch(self, patchJson):
+        with self.masterGraph._sendPatch.time():
+            # throttle and combine patches here- ideally we could see how
+            # long the latency to the client is to make a better rate choice
+            self.sendEvent(message=patchJson, event=b'patch')
+               
+    def unbind(self):
+        self.masterGraph.removeObserver(self.onPatch)
+