changeset 845:2bf4b72cb5e8

fix layering issue with runDepsOnNewPatch Ignore-this: 31489a0371444ba4201c29579a168a40
author drewp@bigasterisk.com
date Tue, 26 Mar 2013 08:17:59 +0000
parents 51adfea492a5
children 89afd38433e8
files light9/rdfdb/autodepgraphapi.py light9/rdfdb/patchreceiver.py light9/rdfdb/syncedgraph.py
diffstat 3 files changed, 31 insertions(+), 33 deletions(-) [+]
line wrap: on
line diff
--- a/light9/rdfdb/autodepgraphapi.py	Tue Mar 26 07:59:48 2013 +0000
+++ b/light9/rdfdb/autodepgraphapi.py	Tue Mar 26 08:17:59 2013 +0000
@@ -41,7 +41,7 @@
         finally:
             self.currentFuncs.pop()
 
-    def updateOnPatch(self, p):
+    def runDepsOnNewPatch(self, p):
         """
         patch p just happened to the graph; call everyone back who
         might care, and then notice what data they depend on now
--- a/light9/rdfdb/patchreceiver.py	Tue Mar 26 07:59:48 2013 +0000
+++ b/light9/rdfdb/patchreceiver.py	Tue Mar 26 08:17:59 2013 +0000
@@ -10,55 +10,34 @@
     master. See onPatch for what happens when the rdfdb master sends
     us a patch
     """
-    def __init__(self, graph, label, initiallySynced):
+    def __init__(self, label, onPatch):
         """
         label is what we'll call ourselves to the rdfdb server
 
-        initiallySynced is a deferred that we'll call back when we get
-        the first patch from the server
+        onPatch is what we call back when the server sends a patch
         """
-        self.graph = graph
-        self.initiallySynced = initiallySynced
-        
         listen = reactor.listenTCP(0, cyclone.web.Application(handlers=[
-            (r'/update', makePatchEndpoint(self._onPatch)),
+            (r'/update', makePatchEndpoint(onPatch)),
         ]))
         port = listen._realPortNumber  # what's the right call for this?
         self.updateResource = 'http://localhost:%s/update' % port
         log.info("listening on %s" % port)
         self._register(label)
 
-    def _onPatch(self, p):
-        """
-        central server has sent us a patch
-        """
-        patchQuads(self.graph, p.delQuads, p.addQuads, perfect=True)
-        log.info("graph now has %s statements" % len(self.graph))
-        try:
-            self.updateOnPatch(p)
-        except Exception:
-            # don't reflect this back to the server; we did
-            # receive its patch correctly.
-            traceback.print_exc()
-
-        if self.initiallySynced:
-            self.initiallySynced.callback(None)
-            self.initiallySynced = None
-
     def _register(self, label):
 
-        def done(x):
-            log.debug("registered with rdfdb")
-
         cyclone.httpclient.fetch(
             url='http://localhost:8051/graphClients',
             method='POST',
             headers={'Content-Type': ['application/x-www-form-urlencoded']},
             postdata=urllib.urlencode([('clientUpdate', self.updateResource),
                                        ('label', label)]),
-            ).addCallbacks(done, log.error)
+            ).addCallbacks(self._done, log.error)
         log.info("registering with rdfdb")
 
+    def _done(self, x):
+        log.debug("registered with rdfdb")
+    
         
 def makePatchEndpointPutMethod(cb):
     def put(self):
--- a/light9/rdfdb/syncedgraph.py	Tue Mar 26 07:59:48 2013 +0000
+++ b/light9/rdfdb/syncedgraph.py	Tue Mar 26 08:17:59 2013 +0000
@@ -17,7 +17,7 @@
 """
 
 from rdflib import ConjunctiveGraph
-import logging, cyclone.httpclient
+import logging, cyclone.httpclient, traceback
 from twisted.internet import defer
 log = logging.getLogger('syncedgraph')
 from light9.rdfdb.rdflibpatch import patchQuads
@@ -57,9 +57,9 @@
         with your connection
         """
         self.initiallySynced = defer.Deferred()
-        _graph = self._graph = ConjunctiveGraph()
+        self._graph = ConjunctiveGraph()
 
-        self._receiver = PatchReceiver(_graph, label, self.initiallySynced)
+        self._receiver = PatchReceiver(label, self._onPatch)
         
         self._sender = PatchSender('http://localhost:8051/patches',
                                    self._receiver.updateResource)
@@ -99,7 +99,7 @@
         # again after that, then give up.
         log.info("%s add %s", [q[2] for q in p.delQuads], [q[2] for q in  p.addQuads])
         patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
-        self.updateOnPatch(p)
+        self.runDepsOnNewPatch(p)
         self._sender.sendPatch(p).addErrback(self.sendFailed)
 
     def sendFailed(self, result):
@@ -111,3 +111,22 @@
         #i think we should receive back all the pending patches,
         #do a resysnc here,
         #then requeue all the pending patches (minus the failing one?) after that's done.
+
+    def _onPatch(self, p):
+        """
+        central server has sent us a patch
+        """
+        patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
+        log.info("graph now has %s statements" % len(self._graph))
+        try:
+            self.runDepsOnNewPatch(p)
+        except Exception:
+            # don't reflect this error back to the server; we did
+            # receive its patch correctly. However, we're in a bad
+            # state since some dependencies may not have rerun
+            traceback.print_exc()
+            log.warn("some dependencies may not have rerun")
+
+        if self.initiallySynced:
+            self.initiallySynced.callback(None)
+            self.initiallySynced = None