changeset 880:e09e3d1d83d5

slightly better behavior when the graph is out of sync. add LocalSubmaster type Ignore-this: 11f7bc0ff3763685a847ded12bb02191
author drewp@bigasterisk.com
date Sat, 08 Jun 2013 01:57:30 +0000
parents 6504df2aee03
children 94ca7d024fe1
files bin/subcomposer light9/rdfdb/patchsender.py light9/rdfdb/syncedgraph.py
diffstat 3 files changed, 37 insertions(+), 12 deletions(-) [+]
line wrap: on
line diff
--- a/bin/subcomposer	Sat Jun 08 00:54:56 2013 +0000
+++ b/bin/subcomposer	Sat Jun 08 01:57:30 2013 +0000
@@ -109,12 +109,14 @@
             graph.patchObject(self.session,
                               self.session, L9['currentSub'], newSub.uri)
 
-            if newSub and 'local' in newSub.uri: # wrong- use rdf:type or something?
-                self._currentChoice(Local)
-            else:
-                # i think right here is the point that the last local
-                # becomes garbage, and we could clean it up. 
-                self._currentChoice(newSub.uri)
+            with graph.currentState() as current:
+                
+                if newSub and (newSub.uri, RDF.type, L9['LocalSubmaster']) in current:
+                    self._currentChoice(Local)
+                else:
+                    # i think right here is the point that the last local
+                    # becomes garbage, and we could clean it up. 
+                    self._currentChoice(newSub.uri)
 
         dispatcher.connect(self.levelsChanged, "sub levels changed")
             
@@ -136,7 +138,10 @@
         # subcomposer process and don't use PersistentSubmaster at all?
 
         new = URIRef("http://local/%s" % time.time())
-        self.graph.patch(Patch(addQuads=[(new, RDF.type, L9['Submaster'], self.session)]))
+        self.graph.patch(Patch(addQuads=[
+            (new, RDF.type, L9['Submaster'], self.session),
+            (new, RDF.type, L9['LocalSubmaster'], self.session),
+        ]))
         
         return new
         
--- a/light9/rdfdb/patchsender.py	Sat Jun 08 00:54:56 2013 +0000
+++ b/light9/rdfdb/patchsender.py	Sat Jun 08 01:57:30 2013 +0000
@@ -27,6 +27,15 @@
         self._patchesToSend.append((p, sendResult))
         self._continueSending()
         return sendResult
+        
+    def cancelAll(self):
+        self._patchesToSend[:] = []
+        # we might be in the middle of a post; ideally that would be
+        # aborted, too. Since that's not coded yet, or it might be too late to
+        # abort, what should happen?
+        # 1. this could return deferred until we think our posts have stopped
+        # 2. or, other code could deal for the fact that cancelAll
+        # isn't perfect
 
     def _continueSending(self):
         if not self._patchesToSend or self._currentSendPatchRequest:
--- a/light9/rdfdb/syncedgraph.py	Sat Jun 08 00:54:56 2013 +0000
+++ b/light9/rdfdb/syncedgraph.py	Sat Jun 08 01:57:30 2013 +0000
@@ -64,6 +64,8 @@
         self._sender = PatchSender('http://localhost:8051/patches',
                                    self._receiver.updateResource)
         AutoDepGraphApi.__init__(self)
+        # this needs more state to track if we're doing a resync (and
+        # everything has to error or wait) or if we're live
         
     def resync(self):
         """
@@ -79,14 +81,17 @@
         should just fail them. There should be a notification back to
         UIs who want to show that we're doing a resync.
         """
+        self._sender.cancelAll()
+        # this should be locked so only one resync goes on at once
         return cyclone.httpclient.fetch(
             url="http://localhost:8051/graph",
             method="GET",
-            headers={'Accept':'x-trig'},
+            headers={'Accept':['x-trig']},
             ).addCallback(self._resyncGraph)
 
     def _resyncGraph(self, response):
-        pass
+        log.warn("new graph in")
+        
         #diff against old entire graph
         #broadcast that change
 
@@ -98,7 +103,11 @@
         # Rerequest the full state from the server, try the patch
         # again after that, then give up.
         log.info("del %s add %s", [q[2] for q in p.delQuads], [q[2] for q in  p.addQuads])
-        patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
+        try:
+            patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
+        except ValueError:
+            self.sendFailed(None)
+            return
         self.runDepsOnNewPatch(p)
         self._sender.sendPatch(p).addErrback(self.sendFailed)
 
@@ -107,9 +116,11 @@
         we asked for a patch to be queued and sent to the master, and
         that ultimately failed because of a conflict
         """
-        print "sendFailed"
+        log.warn("sendFailed")
+        self.resync()
+        
         #i think we should receive back all the pending patches,
-        #do a resysnc here,
+        #do a resync here,
         #then requeue all the pending patches (minus the failing one?) after that's done.
 
     def _onPatch(self, p):