Files
@ 5bd2d036ae74
Branch filter:
Location: light9/light9/rdfdb/patchsender.py
5bd2d036ae74
4.6 KiB
text/x-python
rdfdb log patch-to-client send times
Ignore-this: af10a7e205f559da6a7addd4a0ab93e6
Ignore-this: af10a7e205f559da6a7addd4a0ab93e6
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 | import logging, time
import cyclone.httpclient
from twisted.internet import defer
log = logging.getLogger('syncedgraph')
class PatchSender(object):
"""
SyncedGraph may generate patches faster than we can send
them. This object buffers and may even collapse patches before
they go the server
"""
def __init__(self, target, myUpdateResource):
"""
target is the URI we'll send patches to
myUpdateResource is the URI for this sender of patches, which
maybe needs to be the actual requestable update URI for
sending updates back to us
"""
self.target = target
self.myUpdateResource = myUpdateResource
self._patchesToSend = []
self._currentSendPatchRequest = None
def sendPatch(self, p):
sendResult = defer.Deferred()
self._patchesToSend.append((p, sendResult))
self._continueSending()
return sendResult
def cancelAll(self):
self._patchesToSend[:] = []
# we might be in the middle of a post; ideally that would be
# aborted, too. Since that's not coded yet, or it might be too late to
# abort, what should happen?
# 1. this could return deferred until we think our posts have stopped
# 2. or, other code could deal for the fact that cancelAll
# isn't perfect
def _continueSending(self):
if not self._patchesToSend or self._currentSendPatchRequest:
return
if len(self._patchesToSend) > 1:
log.debug("%s patches left to send", len(self._patchesToSend))
# this is where we could concatenate little patches into a
# bigger one. Often, many statements will cancel each
# other out. not working yet:
if 0:
p = self._patchesToSend[0].concat(self._patchesToSend[1:])
print "concat down to"
print 'dels'
for q in p.delQuads: print q
print 'adds'
for q in p.addQuads: print q
print "----"
else:
p, sendResult = self._patchesToSend.pop(0)
else:
p, sendResult = self._patchesToSend.pop(0)
self._currentSendPatchRequest = sendPatch(
self.target, p, senderUpdateUri=self.myUpdateResource)
self._currentSendPatchRequest.addCallbacks(self._sendPatchDone,
self._sendPatchErr)
self._currentSendPatchRequest.chainDeferred(sendResult)
def _sendPatchDone(self, result):
self._currentSendPatchRequest = None
self._continueSending()
def _sendPatchErr(self, e):
self._currentSendPatchRequest = None
# we're probably out of sync with the master now, since
# SyncedGraph.patch optimistically applied the patch to our
# local graph already. What happens to this patch? What
# happens to further pending patches? Some of the further
# patches, especially, may be commutable with the bad one and
# might still make sense to apply to the master graph.
# if someday we are folding pending patches together, this
# would be the time to UNDO that and attempt the original
# separate patches again
# this should screen for 409 conflict responses and raise a
# special exception for that, so SyncedGraph.sendFailed can
# screen for only that type
# this code is going away; we're going to raise an exception that contains all the pending patches
log.error("_sendPatchErr")
log.error(e)
self._continueSending()
def sendPatch(putUri, patch, **kw):
"""
PUT a patch as json to an http server. Returns deferred.
kwargs will become extra attributes in the toplevel json object
"""
t1 = time.time()
body = patch.makeJsonRepr(kw)
jsonTime = time.time() - t1
intro = body[:200]
if len(body) > 200:
intro = intro + "..."
log.debug("send body (rendered %.1fkB in %.1fms): %s", len(body) / 1024, jsonTime * 1000, intro)
sendTime = time.time()
def putDone(done):
if not str(done.code).startswith('2'):
raise ValueError("sendPatch request failed %s: %s" %
(done.code, done.body))
dt = 1000 * (time.time() - sendTime)
log.debug("sendPatch to %s took %sms, response: %r" %
(putUri, dt, done.body))
return done
return cyclone.httpclient.fetch(
url=putUri,
method='PUT',
headers={'Content-Type': ['application/json']},
postdata=body,
).addCallback(putDone)
|