Mercurial > code > home > repos > light9
view light9/rdfdb/patchsender.py @ 1395:b6ba0e7d126b
workaround for missing effects in songs
Ignore-this: 8433bee972cfc440067149c45e1ffab6
author | drewp@bigasterisk.com |
---|---|
date | Sun, 14 Jun 2015 20:26:36 +0000 |
parents | c50b7d6fcc4e |
children | 5bd2d036ae74 |
line wrap: on
line source
import logging, time import cyclone.httpclient from twisted.internet import defer log = logging.getLogger('syncedgraph') class PatchSender(object): """ SyncedGraph may generate patches faster than we can send them. This object buffers and may even collapse patches before they go the server """ def __init__(self, target, myUpdateResource): """ target is the URI we'll send patches to myUpdateResource is the URI for this sender of patches, which maybe needs to be the actual requestable update URI for sending updates back to us """ self.target = target self.myUpdateResource = myUpdateResource self._patchesToSend = [] self._currentSendPatchRequest = None def sendPatch(self, p): sendResult = defer.Deferred() self._patchesToSend.append((p, sendResult)) self._continueSending() return sendResult def cancelAll(self): self._patchesToSend[:] = [] # we might be in the middle of a post; ideally that would be # aborted, too. Since that's not coded yet, or it might be too late to # abort, what should happen? # 1. this could return deferred until we think our posts have stopped # 2. or, other code could deal for the fact that cancelAll # isn't perfect def _continueSending(self): if not self._patchesToSend or self._currentSendPatchRequest: return if len(self._patchesToSend) > 1: log.debug("%s patches left to send", len(self._patchesToSend)) # this is where we could concatenate little patches into a # bigger one. Often, many statements will cancel each # other out. not working yet: if 0: p = self._patchesToSend[0].concat(self._patchesToSend[1:]) print "concat down to" print 'dels' for q in p.delQuads: print q print 'adds' for q in p.addQuads: print q print "----" else: p, sendResult = self._patchesToSend.pop(0) else: p, sendResult = self._patchesToSend.pop(0) self._currentSendPatchRequest = sendPatch( self.target, p, senderUpdateUri=self.myUpdateResource) self._currentSendPatchRequest.addCallbacks(self._sendPatchDone, self._sendPatchErr) self._currentSendPatchRequest.chainDeferred(sendResult) def _sendPatchDone(self, result): self._currentSendPatchRequest = None self._continueSending() def _sendPatchErr(self, e): self._currentSendPatchRequest = None # we're probably out of sync with the master now, since # SyncedGraph.patch optimistically applied the patch to our # local graph already. What happens to this patch? What # happens to further pending patches? Some of the further # patches, especially, may be commutable with the bad one and # might still make sense to apply to the master graph. # if someday we are folding pending patches together, this # would be the time to UNDO that and attempt the original # separate patches again # this should screen for 409 conflict responses and raise a # special exception for that, so SyncedGraph.sendFailed can # screen for only that type # this code is going away; we're going to raise an exception that contains all the pending patches log.error("_sendPatchErr") log.error(e) self._continueSending() def sendPatch(putUri, patch, **kw): """ PUT a patch as json to an http server. Returns deferred. kwargs will become extra attributes in the toplevel json object """ t1 = time.time() body = patch.makeJsonRepr(kw) jsonTime = time.time() - t1 intro = body[:200] if len(body) > 200: intro = intro + "..." log.debug("send body (rendered %.1fkB in %.1fms): %s", len(body) / 1024, jsonTime * 1000, intro) def putDone(done): if not str(done.code).startswith('2'): raise ValueError("sendPatch request failed %s: %s" % (done.code, done.body)) log.debug("sendPatch finished, response: %r" % done.body) return done return cyclone.httpclient.fetch( url=putUri, method='PUT', headers={'Content-Type': ['application/json']}, postdata=body, ).addCallback(putDone)