Changeset - e09e3d1d83d5
[Not reviewed]
default
0 3 0
drewp@bigasterisk.com - 12 years ago 2013-06-08 01:57:30
drewp@bigasterisk.com
slightly better behavior when the graph is out of sync. add LocalSubmaster type
Ignore-this: 11f7bc0ff3763685a847ded12bb02191
3 files changed with 37 insertions and 12 deletions:
0 comments (0 inline, 0 general)
bin/subcomposer
Show inline comments
 
@@ -88,76 +88,81 @@ class Subcomposer(tk.Frame):
 
        
 
    def setupSubChoiceLinks(self):
 
        graph = self.graph
 
        def ann():
 
            print "currently: session=%s currentSub=%r _currentChoice=%r" % (
 
                self.session, self.currentSub(), self._currentChoice())
 

	
 
        @graph.addHandler
 
        def graphChanged():
 
            s = graph.value(self.session, L9['currentSub'])
 
            if s is None:
 
                s = self.makeLocal()
 
            self.currentSub(Submaster.PersistentSubmaster(graph, s))
 

	
 
        @self.currentSub.subscribe
 
        def subChanged(newSub):
 
            if newSub is None:
 
                graph.patchObject(self.session,
 
                                  self.session, L9['currentSub'], None)
 
                return
 
            self.sendupdate()
 
            graph.patchObject(self.session,
 
                              self.session, L9['currentSub'], newSub.uri)
 

	
 
            if newSub and 'local' in newSub.uri: # wrong- use rdf:type or something?
 
                self._currentChoice(Local)
 
            else:
 
                # i think right here is the point that the last local
 
                # becomes garbage, and we could clean it up. 
 
                self._currentChoice(newSub.uri)
 
            with graph.currentState() as current:
 
                
 
                if newSub and (newSub.uri, RDF.type, L9['LocalSubmaster']) in current:
 
                    self._currentChoice(Local)
 
                else:
 
                    # i think right here is the point that the last local
 
                    # becomes garbage, and we could clean it up. 
 
                    self._currentChoice(newSub.uri)
 

	
 
        dispatcher.connect(self.levelsChanged, "sub levels changed")
 
            
 
        @self._currentChoice.subscribe
 
        def choiceChanged(newChoice):
 
            if newChoice is Local:
 
                newChoice = self.makeLocal()
 
            if newChoice is not None:
 
                self.currentSub(Submaster.PersistentSubmaster(
 
                    graph, newChoice))
 

	
 
    def levelsChanged(self, sub):
 
        if sub == self.currentSub():
 
            self.sendupdate()
 
        
 
    def makeLocal(self):
 
        # todo: put a type on this, so subChanged can identify it right
 
        # todo: where will these get stored, or are they local to this
 
        # subcomposer process and don't use PersistentSubmaster at all?
 

	
 
        new = URIRef("http://local/%s" % time.time())
 
        self.graph.patch(Patch(addQuads=[(new, RDF.type, L9['Submaster'], self.session)]))
 
        self.graph.patch(Patch(addQuads=[
 
            (new, RDF.type, L9['Submaster'], self.session),
 
            (new, RDF.type, L9['LocalSubmaster'], self.session),
 
        ]))
 
        
 
        return new
 
        
 
    def setupLevelboxUi(self):
 
        self.levelbox = Levelbox(self, self.graph, self.currentSub)
 
        self.levelbox.pack(side='top')
 

	
 
        tk.Button(self, text="All to zero",
 
             command=lambda *args: self.currentSub().clear()).pack(side='top')
 

	
 
    def savenewsub(self, subname):
 
        leveldict={}
 
        for i,lev in zip(range(len(self.levels)),self.levels):
 
            if lev!=0:
 
                leveldict[get_channel_name(i+1)]=lev
 

	
 
        s=Submaster.Submaster(subname,leveldict=leveldict)
 
        s.save()
 

	
 
    def sendupdate(self):
 
        d = self.currentSub().get_dmx_list()
 
        dmxclient.outputlevels(d, twisted=True)
 

	
 

	
light9/rdfdb/patchsender.py
Show inline comments
 
@@ -6,48 +6,57 @@ log = logging.getLogger('syncedgraph')
 
class PatchSender(object):
 
    """
 
    SyncedGraph may generate patches faster than we can send
 
    them. This object buffers and may even collapse patches before
 
    they go the server
 
    """
 
    def __init__(self, target, myUpdateResource):
 
        """
 
        target is the URI we'll send patches to
 

	
 
        myUpdateResource is the URI for this sender of patches, which
 
        maybe needs to be the actual requestable update URI for
 
        sending updates back to us
 
        """
 
        self.target = target
 
        self.myUpdateResource = myUpdateResource
 
        self._patchesToSend = []
 
        self._currentSendPatchRequest = None
 

	
 
    def sendPatch(self, p):
 
        sendResult = defer.Deferred()
 
        self._patchesToSend.append((p, sendResult))
 
        self._continueSending()
 
        return sendResult
 
        
 
    def cancelAll(self):
 
        self._patchesToSend[:] = []
 
        # we might be in the middle of a post; ideally that would be
 
        # aborted, too. Since that's not coded yet, or it might be too late to
 
        # abort, what should happen?
 
        # 1. this could return deferred until we think our posts have stopped
 
        # 2. or, other code could deal for the fact that cancelAll
 
        # isn't perfect
 

	
 
    def _continueSending(self):
 
        if not self._patchesToSend or self._currentSendPatchRequest:
 
            return
 
        if len(self._patchesToSend) > 1:
 
            log.info("%s patches left to send", len(self._patchesToSend))
 
            # this is where we could concatenate little patches into a
 
            # bigger one. Often, many statements will cancel each
 
            # other out. not working yet:
 
            if 0:
 
                p = self._patchesToSend[0].concat(self._patchesToSend[1:])
 
                print "concat down to"
 
                print 'dels'
 
                for q in p.delQuads: print q
 
                print 'adds'
 
                for q in p.addQuads: print q
 
                print "----"
 
            else:
 
                p, sendResult = self._patchesToSend.pop(0)
 
        else:
 
            p, sendResult = self._patchesToSend.pop(0)
 

	
 
        self._currentSendPatchRequest = sendPatch(
 
            self.target, p, senderUpdateUri=self.myUpdateResource)
light9/rdfdb/syncedgraph.py
Show inline comments
 
@@ -43,90 +43,101 @@ class SyncedGraph(CurrentStateGraphApi, 
 

	
 
    You may want to attach to self.initiallySynced deferred so you
 
    don't attempt patches before we've heard the initial contents of
 
    the graph. It would be ok to accumulate some patches of new
 
    material, but usually you won't correctly remove the existing
 
    statements unless we have the correct graph.
 

	
 
    If we get out of sync, we abandon our local graph (even any
 
    pending local changes) and get the data again from the
 
    server.
 
    """
 
    def __init__(self, label):
 
        """
 
        label is a string that the server will display in association
 
        with your connection
 
        """
 
        self.initiallySynced = defer.Deferred()
 
        self._graph = ConjunctiveGraph()
 

	
 
        self._receiver = PatchReceiver(label, self._onPatch)
 
        
 
        self._sender = PatchSender('http://localhost:8051/patches',
 
                                   self._receiver.updateResource)
 
        AutoDepGraphApi.__init__(self)
 
        # this needs more state to track if we're doing a resync (and
 
        # everything has to error or wait) or if we're live
 
        
 
    def resync(self):
 
        """
 
        get the whole graph again from the server (e.g. we had a
 
        conflict while applying a patch and want to return to the
 
        truth).
 

	
 
        To avoid too much churn, we remember our old graph and diff it
 
        against the replacement. This way, our callers only see the
 
        corrections.
 

	
 
        Edits you make during a resync will surely be lost, so I
 
        should just fail them. There should be a notification back to
 
        UIs who want to show that we're doing a resync.
 
        """
 
        self._sender.cancelAll()
 
        # this should be locked so only one resync goes on at once
 
        return cyclone.httpclient.fetch(
 
            url="http://localhost:8051/graph",
 
            method="GET",
 
            headers={'Accept':'x-trig'},
 
            headers={'Accept':['x-trig']},
 
            ).addCallback(self._resyncGraph)
 

	
 
    def _resyncGraph(self, response):
 
        pass
 
        log.warn("new graph in")
 
        
 
        #diff against old entire graph
 
        #broadcast that change
 

	
 
    def patch(self, p):
 
        """send this patch to the server and apply it to our local
 
        graph and run handlers"""
 

	
 
        # these could fail if we're out of sync. One approach:
 
        # Rerequest the full state from the server, try the patch
 
        # again after that, then give up.
 
        log.info("del %s add %s", [q[2] for q in p.delQuads], [q[2] for q in  p.addQuads])
 
        patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
 
        try:
 
            patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
 
        except ValueError:
 
            self.sendFailed(None)
 
            return
 
        self.runDepsOnNewPatch(p)
 
        self._sender.sendPatch(p).addErrback(self.sendFailed)
 

	
 
    def sendFailed(self, result):
 
        """
 
        we asked for a patch to be queued and sent to the master, and
 
        that ultimately failed because of a conflict
 
        """
 
        print "sendFailed"
 
        log.warn("sendFailed")
 
        self.resync()
 
        
 
        #i think we should receive back all the pending patches,
 
        #do a resysnc here,
 
        #do a resync here,
 
        #then requeue all the pending patches (minus the failing one?) after that's done.
 

	
 
    def _onPatch(self, p):
 
        """
 
        central server has sent us a patch
 
        """
 
        patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
 
        log.info("graph now has %s statements" % len(self._graph))
 
        try:
 
            self.runDepsOnNewPatch(p)
 
        except Exception:
 
            # don't reflect this error back to the server; we did
 
            # receive its patch correctly. However, we're in a bad
 
            # state since some dependencies may not have rerun
 
            traceback.print_exc()
 
            log.warn("some graph dependencies may not have completely run")
 

	
 
        if self.initiallySynced:
 
            self.initiallySynced.callback(None)
 
            self.initiallySynced = None
0 comments (0 inline, 0 general)