Changeset - 2bf4b72cb5e8
[Not reviewed]
default
0 3 0
drewp@bigasterisk.com - 12 years ago 2013-03-26 08:17:59
drewp@bigasterisk.com
fix layering issue with runDepsOnNewPatch
Ignore-this: 31489a0371444ba4201c29579a168a40
3 files changed with 31 insertions and 33 deletions:
0 comments (0 inline, 0 general)
light9/rdfdb/autodepgraphapi.py
Show inline comments
 
@@ -41,7 +41,7 @@ class AutoDepGraphApi(object):
 
        finally:
 
            self.currentFuncs.pop()
 

	
 
    def updateOnPatch(self, p):
 
    def runDepsOnNewPatch(self, p):
 
        """
 
        patch p just happened to the graph; call everyone back who
 
        might care, and then notice what data they depend on now
light9/rdfdb/patchreceiver.py
Show inline comments
 
@@ -10,55 +10,34 @@ class PatchReceiver(object):
 
    master. See onPatch for what happens when the rdfdb master sends
 
    us a patch
 
    """
 
    def __init__(self, graph, label, initiallySynced):
 
    def __init__(self, label, onPatch):
 
        """
 
        label is what we'll call ourselves to the rdfdb server
 

	
 
        initiallySynced is a deferred that we'll call back when we get
 
        the first patch from the server
 
        onPatch is what we call back when the server sends a patch
 
        """
 
        self.graph = graph
 
        self.initiallySynced = initiallySynced
 
        
 
        listen = reactor.listenTCP(0, cyclone.web.Application(handlers=[
 
            (r'/update', makePatchEndpoint(self._onPatch)),
 
            (r'/update', makePatchEndpoint(onPatch)),
 
        ]))
 
        port = listen._realPortNumber  # what's the right call for this?
 
        self.updateResource = 'http://localhost:%s/update' % port
 
        log.info("listening on %s" % port)
 
        self._register(label)
 

	
 
    def _onPatch(self, p):
 
        """
 
        central server has sent us a patch
 
        """
 
        patchQuads(self.graph, p.delQuads, p.addQuads, perfect=True)
 
        log.info("graph now has %s statements" % len(self.graph))
 
        try:
 
            self.updateOnPatch(p)
 
        except Exception:
 
            # don't reflect this back to the server; we did
 
            # receive its patch correctly.
 
            traceback.print_exc()
 

	
 
        if self.initiallySynced:
 
            self.initiallySynced.callback(None)
 
            self.initiallySynced = None
 

	
 
    def _register(self, label):
 

	
 
        def done(x):
 
            log.debug("registered with rdfdb")
 

	
 
        cyclone.httpclient.fetch(
 
            url='http://localhost:8051/graphClients',
 
            method='POST',
 
            headers={'Content-Type': ['application/x-www-form-urlencoded']},
 
            postdata=urllib.urlencode([('clientUpdate', self.updateResource),
 
                                       ('label', label)]),
 
            ).addCallbacks(done, log.error)
 
            ).addCallbacks(self._done, log.error)
 
        log.info("registering with rdfdb")
 

	
 
    def _done(self, x):
 
        log.debug("registered with rdfdb")
 
    
 
        
 
def makePatchEndpointPutMethod(cb):
 
    def put(self):
light9/rdfdb/syncedgraph.py
Show inline comments
 
@@ -17,7 +17,7 @@ PatchSender - collects and transmits you
 
"""
 

	
 
from rdflib import ConjunctiveGraph
 
import logging, cyclone.httpclient
 
import logging, cyclone.httpclient, traceback
 
from twisted.internet import defer
 
log = logging.getLogger('syncedgraph')
 
from light9.rdfdb.rdflibpatch import patchQuads
 
@@ -57,9 +57,9 @@ class SyncedGraph(CurrentStateGraphApi, 
 
        with your connection
 
        """
 
        self.initiallySynced = defer.Deferred()
 
        _graph = self._graph = ConjunctiveGraph()
 
        self._graph = ConjunctiveGraph()
 

	
 
        self._receiver = PatchReceiver(_graph, label, self.initiallySynced)
 
        self._receiver = PatchReceiver(label, self._onPatch)
 
        
 
        self._sender = PatchSender('http://localhost:8051/patches',
 
                                   self._receiver.updateResource)
 
@@ -99,7 +99,7 @@ class SyncedGraph(CurrentStateGraphApi, 
 
        # again after that, then give up.
 
        log.info("%s add %s", [q[2] for q in p.delQuads], [q[2] for q in  p.addQuads])
 
        patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
 
        self.updateOnPatch(p)
 
        self.runDepsOnNewPatch(p)
 
        self._sender.sendPatch(p).addErrback(self.sendFailed)
 

	
 
    def sendFailed(self, result):
 
@@ -111,3 +111,22 @@ class SyncedGraph(CurrentStateGraphApi, 
 
        #i think we should receive back all the pending patches,
 
        #do a resysnc here,
 
        #then requeue all the pending patches (minus the failing one?) after that's done.
 

	
 
    def _onPatch(self, p):
 
        """
 
        central server has sent us a patch
 
        """
 
        patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
 
        log.info("graph now has %s statements" % len(self._graph))
 
        try:
 
            self.runDepsOnNewPatch(p)
 
        except Exception:
 
            # don't reflect this error back to the server; we did
 
            # receive its patch correctly. However, we're in a bad
 
            # state since some dependencies may not have rerun
 
            traceback.print_exc()
 
            log.warn("some dependencies may not have rerun")
 

	
 
        if self.initiallySynced:
 
            self.initiallySynced.callback(None)
 
            self.initiallySynced = None
0 comments (0 inline, 0 general)