view light9/rdfdb/patchreceiver.py @ 843:77b5dbcf688d

split syncedgraph into several layers Ignore-this: ed978c899898f5fca08d9a68bee27cfb
author drewp@bigasterisk.com
date Tue, 26 Mar 2013 07:48:07 +0000
parents
children 51adfea492a5
line wrap: on
line source

import logging, cyclone.httpclient, traceback, urllib
from twisted.internet import reactor
from light9.rdfdb.rdflibpatch import patchQuads
from light9.rdfdb.patch import Patch
log = logging.getLogger('syncedgraph')

class PatchReceiver(object):
    """
    runs a web server in this process and registers it with the rdfdb
    master. See onPatch for what happens when the rdfdb master sends
    us a patch
    """
    def __init__(self, label, graph, initiallySynced):
        self.graph = graph
        self.initiallySynced = initiallySynced
        
        listen = reactor.listenTCP(0, cyclone.web.Application(handlers=[
            (r'/update', makePatchEndpoint(self.onPatch)),
        ]))
        port = listen._realPortNumber  # what's the right call for this?
        self.updateResource = 'http://localhost:%s/update' % port
        log.info("listening on %s" % port)
        self.register(label)

    def onPatch(self, p):
        """
        central server has sent us a patch
        """
        patchQuads(self.graph, p.delQuads, p.addQuads, perfect=True)
        log.info("graph now has %s statements" % len(self.graph))
        try:
            self.updateOnPatch(p)
        except Exception:
            # don't reflect this back to the server; we did
            # receive its patch correctly.
            traceback.print_exc()

        if self.initiallySynced:
            self.initiallySynced.callback(None)
            self.initiallySynced = None

    def register(self, label):

        def done(x):
            log.debug("registered with rdfdb")

        cyclone.httpclient.fetch(
            url='http://localhost:8051/graphClients',
            method='POST',
            headers={'Content-Type': ['application/x-www-form-urlencoded']},
            postdata=urllib.urlencode([('clientUpdate', self.updateResource),
                                       ('label', label)]),
            ).addCallbacks(done, log.error)
        log.info("registering with rdfdb")

        
def makePatchEndpointPutMethod(cb):
    def put(self):
        try:
            p = Patch(jsonRepr=self.request.body)
            log.info("received patch -%d +%d" % (len(p.delGraph), len(p.addGraph)))
            cb(p)
        except:
            traceback.print_exc()
            raise
    return put

def makePatchEndpoint(cb):
    class Update(cyclone.web.RequestHandler):
        put = makePatchEndpointPutMethod(cb)
    return Update