Changeset - 2bf4b72cb5e8
[Not reviewed]
default
0 3 0
drewp@bigasterisk.com - 12 years ago 2013-03-26 08:17:59
drewp@bigasterisk.com
fix layering issue with runDepsOnNewPatch
Ignore-this: 31489a0371444ba4201c29579a168a40
3 files changed with 31 insertions and 33 deletions:
0 comments (0 inline, 0 general)
light9/rdfdb/autodepgraphapi.py
Show inline comments
 
@@ -38,13 +38,13 @@ class AutoDepGraphApi(object):
 
        self.currentFuncs.append(func)
 
        try:
 
            func()
 
        finally:
 
            self.currentFuncs.pop()
 

	
 
    def updateOnPatch(self, p):
 
    def runDepsOnNewPatch(self, p):
 
        """
 
        patch p just happened to the graph; call everyone back who
 
        might care, and then notice what data they depend on now
 
        """
 
        for func in self._watchers.whoCares(p):
 
            # todo: forget the old handlers for this func
light9/rdfdb/patchreceiver.py
Show inline comments
 
@@ -7,61 +7,40 @@ log = logging.getLogger('syncedgraph')
 
class PatchReceiver(object):
 
    """
 
    runs a web server in this process and registers it with the rdfdb
 
    master. See onPatch for what happens when the rdfdb master sends
 
    us a patch
 
    """
 
    def __init__(self, graph, label, initiallySynced):
 
    def __init__(self, label, onPatch):
 
        """
 
        label is what we'll call ourselves to the rdfdb server
 

	
 
        initiallySynced is a deferred that we'll call back when we get
 
        the first patch from the server
 
        onPatch is what we call back when the server sends a patch
 
        """
 
        self.graph = graph
 
        self.initiallySynced = initiallySynced
 
        
 
        listen = reactor.listenTCP(0, cyclone.web.Application(handlers=[
 
            (r'/update', makePatchEndpoint(self._onPatch)),
 
            (r'/update', makePatchEndpoint(onPatch)),
 
        ]))
 
        port = listen._realPortNumber  # what's the right call for this?
 
        self.updateResource = 'http://localhost:%s/update' % port
 
        log.info("listening on %s" % port)
 
        self._register(label)
 

	
 
    def _onPatch(self, p):
 
        """
 
        central server has sent us a patch
 
        """
 
        patchQuads(self.graph, p.delQuads, p.addQuads, perfect=True)
 
        log.info("graph now has %s statements" % len(self.graph))
 
        try:
 
            self.updateOnPatch(p)
 
        except Exception:
 
            # don't reflect this back to the server; we did
 
            # receive its patch correctly.
 
            traceback.print_exc()
 

	
 
        if self.initiallySynced:
 
            self.initiallySynced.callback(None)
 
            self.initiallySynced = None
 

	
 
    def _register(self, label):
 

	
 
        def done(x):
 
            log.debug("registered with rdfdb")
 

	
 
        cyclone.httpclient.fetch(
 
            url='http://localhost:8051/graphClients',
 
            method='POST',
 
            headers={'Content-Type': ['application/x-www-form-urlencoded']},
 
            postdata=urllib.urlencode([('clientUpdate', self.updateResource),
 
                                       ('label', label)]),
 
            ).addCallbacks(done, log.error)
 
            ).addCallbacks(self._done, log.error)
 
        log.info("registering with rdfdb")
 

	
 
    def _done(self, x):
 
        log.debug("registered with rdfdb")
 
    
 
        
 
def makePatchEndpointPutMethod(cb):
 
    def put(self):
 
        try:
 
            p = Patch(jsonRepr=self.request.body)
 
            log.info("received patch -%d +%d" % (len(p.delGraph), len(p.addGraph)))
light9/rdfdb/syncedgraph.py
Show inline comments
 
@@ -14,13 +14,13 @@ operations, e.g. replacing a value, or e
 
PatchReceiver - our web server that listens to edits from the master graph
 

	
 
PatchSender - collects and transmits your graph edits
 
"""
 

	
 
from rdflib import ConjunctiveGraph
 
import logging, cyclone.httpclient
 
import logging, cyclone.httpclient, traceback
 
from twisted.internet import defer
 
log = logging.getLogger('syncedgraph')
 
from light9.rdfdb.rdflibpatch import patchQuads
 

	
 
from light9.rdfdb.patchsender import PatchSender
 
from light9.rdfdb.patchreceiver import PatchReceiver
 
@@ -54,15 +54,15 @@ class SyncedGraph(CurrentStateGraphApi, 
 
    def __init__(self, label):
 
        """
 
        label is a string that the server will display in association
 
        with your connection
 
        """
 
        self.initiallySynced = defer.Deferred()
 
        _graph = self._graph = ConjunctiveGraph()
 
        self._graph = ConjunctiveGraph()
 

	
 
        self._receiver = PatchReceiver(_graph, label, self.initiallySynced)
 
        self._receiver = PatchReceiver(label, self._onPatch)
 
        
 
        self._sender = PatchSender('http://localhost:8051/patches',
 
                                   self._receiver.updateResource)
 
        AutoDepGraphApi.__init__(self)
 
        
 
    def resync(self):
 
@@ -96,18 +96,37 @@ class SyncedGraph(CurrentStateGraphApi, 
 

	
 
        # these could fail if we're out of sync. One approach:
 
        # Rerequest the full state from the server, try the patch
 
        # again after that, then give up.
 
        log.info("%s add %s", [q[2] for q in p.delQuads], [q[2] for q in  p.addQuads])
 
        patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
 
        self.updateOnPatch(p)
 
        self.runDepsOnNewPatch(p)
 
        self._sender.sendPatch(p).addErrback(self.sendFailed)
 

	
 
    def sendFailed(self, result):
 
        """
 
        we asked for a patch to be queued and sent to the master, and
 
        that ultimately failed because of a conflict
 
        """
 
        print "sendFailed"
 
        #i think we should receive back all the pending patches,
 
        #do a resysnc here,
 
        #then requeue all the pending patches (minus the failing one?) after that's done.
 

	
 
    def _onPatch(self, p):
 
        """
 
        central server has sent us a patch
 
        """
 
        patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
 
        log.info("graph now has %s statements" % len(self._graph))
 
        try:
 
            self.runDepsOnNewPatch(p)
 
        except Exception:
 
            # don't reflect this error back to the server; we did
 
            # receive its patch correctly. However, we're in a bad
 
            # state since some dependencies may not have rerun
 
            traceback.print_exc()
 
            log.warn("some dependencies may not have rerun")
 

	
 
        if self.initiallySynced:
 
            self.initiallySynced.callback(None)
 
            self.initiallySynced = None
0 comments (0 inline, 0 general)