diff --git a/light9/rdfdb/autodepgraphapi.py b/light9/rdfdb/autodepgraphapi.py --- a/light9/rdfdb/autodepgraphapi.py +++ b/light9/rdfdb/autodepgraphapi.py @@ -41,7 +41,7 @@ class AutoDepGraphApi(object): finally: self.currentFuncs.pop() - def updateOnPatch(self, p): + def runDepsOnNewPatch(self, p): """ patch p just happened to the graph; call everyone back who might care, and then notice what data they depend on now diff --git a/light9/rdfdb/patchreceiver.py b/light9/rdfdb/patchreceiver.py --- a/light9/rdfdb/patchreceiver.py +++ b/light9/rdfdb/patchreceiver.py @@ -10,55 +10,34 @@ class PatchReceiver(object): master. See onPatch for what happens when the rdfdb master sends us a patch """ - def __init__(self, graph, label, initiallySynced): + def __init__(self, label, onPatch): """ label is what we'll call ourselves to the rdfdb server - initiallySynced is a deferred that we'll call back when we get - the first patch from the server + onPatch is what we call back when the server sends a patch """ - self.graph = graph - self.initiallySynced = initiallySynced - listen = reactor.listenTCP(0, cyclone.web.Application(handlers=[ - (r'/update', makePatchEndpoint(self._onPatch)), + (r'/update', makePatchEndpoint(onPatch)), ])) port = listen._realPortNumber # what's the right call for this? self.updateResource = 'http://localhost:%s/update' % port log.info("listening on %s" % port) self._register(label) - def _onPatch(self, p): - """ - central server has sent us a patch - """ - patchQuads(self.graph, p.delQuads, p.addQuads, perfect=True) - log.info("graph now has %s statements" % len(self.graph)) - try: - self.updateOnPatch(p) - except Exception: - # don't reflect this back to the server; we did - # receive its patch correctly. - traceback.print_exc() - - if self.initiallySynced: - self.initiallySynced.callback(None) - self.initiallySynced = None - def _register(self, label): - def done(x): - log.debug("registered with rdfdb") - cyclone.httpclient.fetch( url='http://localhost:8051/graphClients', method='POST', headers={'Content-Type': ['application/x-www-form-urlencoded']}, postdata=urllib.urlencode([('clientUpdate', self.updateResource), ('label', label)]), - ).addCallbacks(done, log.error) + ).addCallbacks(self._done, log.error) log.info("registering with rdfdb") + def _done(self, x): + log.debug("registered with rdfdb") + def makePatchEndpointPutMethod(cb): def put(self): diff --git a/light9/rdfdb/syncedgraph.py b/light9/rdfdb/syncedgraph.py --- a/light9/rdfdb/syncedgraph.py +++ b/light9/rdfdb/syncedgraph.py @@ -17,7 +17,7 @@ PatchSender - collects and transmits you """ from rdflib import ConjunctiveGraph -import logging, cyclone.httpclient +import logging, cyclone.httpclient, traceback from twisted.internet import defer log = logging.getLogger('syncedgraph') from light9.rdfdb.rdflibpatch import patchQuads @@ -57,9 +57,9 @@ class SyncedGraph(CurrentStateGraphApi, with your connection """ self.initiallySynced = defer.Deferred() - _graph = self._graph = ConjunctiveGraph() + self._graph = ConjunctiveGraph() - self._receiver = PatchReceiver(_graph, label, self.initiallySynced) + self._receiver = PatchReceiver(label, self._onPatch) self._sender = PatchSender('http://localhost:8051/patches', self._receiver.updateResource) @@ -99,7 +99,7 @@ class SyncedGraph(CurrentStateGraphApi, # again after that, then give up. log.info("%s add %s", [q[2] for q in p.delQuads], [q[2] for q in p.addQuads]) patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True) - self.updateOnPatch(p) + self.runDepsOnNewPatch(p) self._sender.sendPatch(p).addErrback(self.sendFailed) def sendFailed(self, result): @@ -111,3 +111,22 @@ class SyncedGraph(CurrentStateGraphApi, #i think we should receive back all the pending patches, #do a resysnc here, #then requeue all the pending patches (minus the failing one?) after that's done. + + def _onPatch(self, p): + """ + central server has sent us a patch + """ + patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True) + log.info("graph now has %s statements" % len(self._graph)) + try: + self.runDepsOnNewPatch(p) + except Exception: + # don't reflect this error back to the server; we did + # receive its patch correctly. However, we're in a bad + # state since some dependencies may not have rerun + traceback.print_exc() + log.warn("some dependencies may not have rerun") + + if self.initiallySynced: + self.initiallySynced.callback(None) + self.initiallySynced = None