Changeset - e09e3d1d83d5
[Not reviewed]
default
0 3 0
drewp@bigasterisk.com - 12 years ago 2013-06-08 01:57:30
drewp@bigasterisk.com
slightly better behavior when the graph is out of sync. add LocalSubmaster type
Ignore-this: 11f7bc0ff3763685a847ded12bb02191
3 files changed with 31 insertions and 6 deletions:
0 comments (0 inline, 0 general)
bin/subcomposer
Show inline comments
 
@@ -106,13 +106,15 @@ class Subcomposer(tk.Frame):
 
                                  self.session, L9['currentSub'], None)
 
                return
 
            self.sendupdate()
 
            graph.patchObject(self.session,
 
                              self.session, L9['currentSub'], newSub.uri)
 

	
 
            if newSub and 'local' in newSub.uri: # wrong- use rdf:type or something?
 
            with graph.currentState() as current:
 
                
 
                if newSub and (newSub.uri, RDF.type, L9['LocalSubmaster']) in current:
 
                self._currentChoice(Local)
 
            else:
 
                # i think right here is the point that the last local
 
                # becomes garbage, and we could clean it up. 
 
                self._currentChoice(newSub.uri)
 

	
 
@@ -133,13 +135,16 @@ class Subcomposer(tk.Frame):
 
    def makeLocal(self):
 
        # todo: put a type on this, so subChanged can identify it right
 
        # todo: where will these get stored, or are they local to this
 
        # subcomposer process and don't use PersistentSubmaster at all?
 

	
 
        new = URIRef("http://local/%s" % time.time())
 
        self.graph.patch(Patch(addQuads=[(new, RDF.type, L9['Submaster'], self.session)]))
 
        self.graph.patch(Patch(addQuads=[
 
            (new, RDF.type, L9['Submaster'], self.session),
 
            (new, RDF.type, L9['LocalSubmaster'], self.session),
 
        ]))
 
        
 
        return new
 
        
 
    def setupLevelboxUi(self):
 
        self.levelbox = Levelbox(self, self.graph, self.currentSub)
 
        self.levelbox.pack(side='top')
light9/rdfdb/patchsender.py
Show inline comments
 
@@ -25,12 +25,21 @@ class PatchSender(object):
 
    def sendPatch(self, p):
 
        sendResult = defer.Deferred()
 
        self._patchesToSend.append((p, sendResult))
 
        self._continueSending()
 
        return sendResult
 

	
 
    def cancelAll(self):
 
        self._patchesToSend[:] = []
 
        # we might be in the middle of a post; ideally that would be
 
        # aborted, too. Since that's not coded yet, or it might be too late to
 
        # abort, what should happen?
 
        # 1. this could return deferred until we think our posts have stopped
 
        # 2. or, other code could deal for the fact that cancelAll
 
        # isn't perfect
 

	
 
    def _continueSending(self):
 
        if not self._patchesToSend or self._currentSendPatchRequest:
 
            return
 
        if len(self._patchesToSend) > 1:
 
            log.info("%s patches left to send", len(self._patchesToSend))
 
            # this is where we could concatenate little patches into a
light9/rdfdb/syncedgraph.py
Show inline comments
 
@@ -61,12 +61,14 @@ class SyncedGraph(CurrentStateGraphApi, 
 

	
 
        self._receiver = PatchReceiver(label, self._onPatch)
 
        
 
        self._sender = PatchSender('http://localhost:8051/patches',
 
                                   self._receiver.updateResource)
 
        AutoDepGraphApi.__init__(self)
 
        # this needs more state to track if we're doing a resync (and
 
        # everything has to error or wait) or if we're live
 
        
 
    def resync(self):
 
        """
 
        get the whole graph again from the server (e.g. we had a
 
        conflict while applying a patch and want to return to the
 
        truth).
 
@@ -76,43 +78,52 @@ class SyncedGraph(CurrentStateGraphApi, 
 
        corrections.
 

	
 
        Edits you make during a resync will surely be lost, so I
 
        should just fail them. There should be a notification back to
 
        UIs who want to show that we're doing a resync.
 
        """
 
        self._sender.cancelAll()
 
        # this should be locked so only one resync goes on at once
 
        return cyclone.httpclient.fetch(
 
            url="http://localhost:8051/graph",
 
            method="GET",
 
            headers={'Accept':'x-trig'},
 
            headers={'Accept':['x-trig']},
 
            ).addCallback(self._resyncGraph)
 

	
 
    def _resyncGraph(self, response):
 
        pass
 
        log.warn("new graph in")
 
        
 
        #diff against old entire graph
 
        #broadcast that change
 

	
 
    def patch(self, p):
 
        """send this patch to the server and apply it to our local
 
        graph and run handlers"""
 

	
 
        # these could fail if we're out of sync. One approach:
 
        # Rerequest the full state from the server, try the patch
 
        # again after that, then give up.
 
        log.info("del %s add %s", [q[2] for q in p.delQuads], [q[2] for q in  p.addQuads])
 
        try:
 
        patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
 
        except ValueError:
 
            self.sendFailed(None)
 
            return
 
        self.runDepsOnNewPatch(p)
 
        self._sender.sendPatch(p).addErrback(self.sendFailed)
 

	
 
    def sendFailed(self, result):
 
        """
 
        we asked for a patch to be queued and sent to the master, and
 
        that ultimately failed because of a conflict
 
        """
 
        print "sendFailed"
 
        log.warn("sendFailed")
 
        self.resync()
 
        
 
        #i think we should receive back all the pending patches,
 
        #do a resysnc here,
 
        #do a resync here,
 
        #then requeue all the pending patches (minus the failing one?) after that's done.
 

	
 
    def _onPatch(self, p):
 
        """
 
        central server has sent us a patch
 
        """
0 comments (0 inline, 0 general)