changeset 843:77b5dbcf688d

split syncedgraph into several layers Ignore-this: ed978c899898f5fca08d9a68bee27cfb
author drewp@bigasterisk.com
date Tue, 26 Mar 2013 07:48:07 +0000
parents c0025b69a73f
children 51adfea492a5
files bin/rdfdb light9/rdfdb/autodepgraphapi.py light9/rdfdb/currentstategraphapi.py light9/rdfdb/grapheditapi.py light9/rdfdb/patchreceiver.py light9/rdfdb/patchsender.py light9/rdfdb/syncedgraph.py
diffstat 7 files changed, 471 insertions(+), 401 deletions(-) [+]
line wrap: on
line diff
--- a/bin/rdfdb	Tue Mar 26 07:08:44 2013 +0000
+++ b/bin/rdfdb	Tue Mar 26 07:48:07 2013 +0000
@@ -119,7 +119,9 @@
 from light9.rdfdb.graphfile import GraphFile
 from light9.rdfdb.patch import Patch, ALLSTMTS
 from light9.rdfdb.rdflibpatch import patchQuads
-from light9.rdfdb import syncedgraph
+
+from light9.rdfdb.patchsender import sendPatch
+from light9.rdfdb.patchreceiver import makePatchEndpointPutMethod
 
 from twisted.internet.inotify import INotify
 from run_local import log
@@ -155,7 +157,7 @@
             delQuads=[]))
 
     def sendPatch(self, p):
-        return syncedgraph.sendPatch(self.updateUri, p)
+        return sendPatch(self.updateUri, p)
 
 class WatchedFiles(object):
     """
@@ -340,7 +342,7 @@
 class Patches(PrettyErrorHandler, cyclone.web.RequestHandler):
     def __init__(self, *args, **kw):
         cyclone.web.RequestHandler.__init__(self, *args, **kw)
-        p = syncedgraph.makePatchEndpointPutMethod(self.settings.db.patch)
+        p = makePatchEndpointPutMethod(self.settings.db.patch)
         self.put = lambda: p(self)
 
     def get(self):
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/light9/rdfdb/autodepgraphapi.py	Tue Mar 26 07:48:07 2013 +0000
@@ -0,0 +1,158 @@
+import logging
+from rdflib import RDF, RDFS
+from light9.rdfdb.currentstategraphapi import contextsForStatementNoWildcards
+log = logging.getLogger('syncedgraph')
+
+class AutoDepGraphApi(object):
+    """
+    mixin for SyncedGraph, separated here because these methods work together
+    """
+
+    def __init__(self):
+        self._watchers = GraphWatchers()
+        self.currentFuncs = [] # stack of addHandler callers
+    
+    def addHandler(self, func):
+        """
+        run this (idempotent) func, noting what graph values it
+        uses. Run it again in the future if there are changes to those
+        graph values. The func might use different values during that
+        future call, and those will be what we watch for next.
+        """
+
+        # if we saw this func before, we need to forget the old
+        # callbacks it wanted and replace with the new ones we see
+        # now.
+
+        # if one handler func calls another, does that break anything?
+        # maybe not?
+
+        # no plan for sparql queries yet. Hook into a lower layer that
+        # reveals all their statement fetches? Just make them always
+        # new? Cache their results, so if i make the query again and
+        # it gives the same result, I don't call the handler?
+
+        self.currentFuncs.append(func)
+        try:
+            func()
+        finally:
+            self.currentFuncs.pop()
+
+    def updateOnPatch(self, p):
+        """
+        patch p just happened to the graph; call everyone back who
+        might care, and then notice what data they depend on now
+        """
+        for func in self._watchers.whoCares(p):
+            # todo: forget the old handlers for this func
+            self.addHandler(func)
+
+    def _getCurrentFunc(self):
+        if not self.currentFuncs:
+            # this may become a warning later
+            raise ValueError("asked for graph data outside of a handler")
+
+        # we add the watcher to the deepest function, since that
+        # should be the cheapest way to update when this part of the
+        # data changes
+        return self.currentFuncs[-1]
+
+    # these just call through to triples() so it might be possible to
+    # watch just that one.
+
+    # if you get a bnode in your response, maybe the answer to
+    # dependency tracking is to say that you depended on the triple
+    # that got you that bnode, since it is likely to change to another
+    # bnode later. This won't work if the receiver stores bnodes
+    # between calls, but probably most of them don't do that (they
+    # work from a starting uri)
+
+    def value(self, subject=None, predicate=RDF.value, object=None,
+              default=None, any=True):
+        if object is not None:
+            raise NotImplementedError()
+        func = self._getCurrentFunc()
+        self._watchers.addSubjPredWatcher(func, subject, predicate)
+        return self._graph.value(subject, predicate, object=object,
+                                 default=default, any=any)
+
+    def objects(self, subject=None, predicate=None):
+        func = self._getCurrentFunc()
+        self._watchers.addSubjPredWatcher(func, subject, predicate)
+        return self._graph.objects(subject, predicate)
+
+    def label(self, uri):
+        return self.value(uri, RDFS.label)
+
+    def subjects(self, predicate=None, object=None):
+        func = self._getCurrentFunc()
+        self._watchers.addPredObjWatcher(func, predicate, object)
+        return self._graph.subjects(predicate, object)
+
+    def contextsForStatement(self, triple):
+        """currently this needs to be in an addHandler section, but it
+        sets no watchers so it won't actually update if the statement
+        was added or dropped from contexts"""
+        func = self._getCurrentFunc()
+        return contextsForStatementNoWildcards(self._graph, triple)
+
+    # i find myself wanting 'patch' (aka enter/leave) versions of these calls that tell
+    # you only what results have just appeared or disappeared. I think
+    # I'm going to be repeating that logic a lot. Maybe just for the
+    # subjects(RDF.type, t) call
+
+
+class GraphWatchers(object):
+    """
+    store the current handlers that care about graph changes
+    """
+    def __init__(self):
+        self._handlersSp = {} # (s,p): set(handlers)
+        self._handlersPo = {} # (p,o): set(handlers)
+
+    def addSubjPredWatcher(self, func, s, p):
+        if func is None:
+            return
+        key = s, p
+        try:
+            self._handlersSp.setdefault(key, set()).add(func)
+        except Exception:
+            log.error("with key %r and func %r" % (key, func))
+            raise
+
+    def addPredObjWatcher(self, func, p, o):
+        self._handlersPo.setdefault((p, o), set()).add(func)
+
+    def whoCares(self, patch):
+        """what handler functions would care about the changes in this patch?
+
+        this removes the handlers that it gives you
+        """
+        #self.dependencies()
+        affectedSubjPreds = set([(s, p) for s, p, o, c in patch.addQuads]+
+                                [(s, p) for s, p, o, c in patch.delQuads])
+        affectedPredObjs = set([(p, o) for s, p, o, c in patch.addQuads]+
+                                [(p, o) for s, p, o, c in patch.delQuads])
+
+        ret = set()
+        for (s, p), funcs in self._handlersSp.iteritems():
+            if (s, p) in affectedSubjPreds:
+                ret.update(funcs)
+                funcs.clear()
+
+        for (p, o), funcs in self._handlersPo.iteritems():
+            if (p, o) in affectedPredObjs:
+                ret.update(funcs)
+                funcs.clear()
+
+        return ret
+
+    def dependencies(self):
+        """
+        for debugging, make a list of all the active handlers and what
+        data they depend on. This is meant for showing on the web ui
+        for browsing.
+        """
+        log.info("whocares:")
+        from pprint import pprint
+        pprint(self._handlersSp)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/light9/rdfdb/currentstategraphapi.py	Tue Mar 26 07:48:07 2013 +0000
@@ -0,0 +1,35 @@
+from rdflib import ConjunctiveGraph
+from light9.rdfdb.rdflibpatch import contextsForStatement as rp_contextsForStatement
+
+class CurrentStateGraphApi(object):
+    
+    def currentState(self, context=None):
+        """
+        a graph you can read without being in an addHandler
+        """
+        if context is not None:
+            raise NotImplementedError("currentState with context arg")
+
+        class Mgr(object):
+            def __enter__(self2):
+                # this should be a readonly view of the existing
+                # graph, maybe with something to guard against
+                # writes/patches happening while reads are being
+                # done. Typical usage will do some reads on this graph
+                # before moving on to writes.
+                
+                g = ConjunctiveGraph()
+                for s,p,o,c in self._graph.quads((None,None,None)):
+                    g.store.add((s,p,o), c)
+                g.contextsForStatement = lambda t: contextsForStatementNoWildcards(g, t)
+                return g
+
+            def __exit__(self, type, val, tb):
+                return
+
+        return Mgr()
+
+def contextsForStatementNoWildcards(g, triple):
+    if None in triple:
+        raise NotImplementedError("no wildcards")
+    return rp_contextsForStatement(g, triple)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/light9/rdfdb/grapheditapi.py	Tue Mar 26 07:48:07 2013 +0000
@@ -0,0 +1,73 @@
+import random
+from itertools import chain
+from rdflib import URIRef, RDF
+from light9.rdfdb.patch import Patch
+
+class GraphEditApi(object):
+    """
+    mixin for SyncedGraph, separated here because these methods work together
+    """
+
+    def patchObject(self, context, subject, predicate, newObject):
+        """send a patch which removes existing values for (s,p,*,c)
+        and adds (s,p,newObject,c). Values in other graphs are not affected.
+
+        newObject can be None, which will remove all (subj,pred,*) statements.
+        """
+
+        existing = []
+        for spo in self._graph.triples((subject, predicate, None),
+                                     context=context):
+            existing.append(spo+(context,))
+        # what layer is supposed to cull out no-op changes?
+        self.patch(Patch(
+            delQuads=existing,
+            addQuads=([(subject, predicate, newObject, context)]
+                      if newObject is not None else [])))
+
+    def patchMapping(self, context, subject, predicate, nodeClass, keyPred, valuePred, newKey, newValue):
+        """
+        creates/updates a structure like this:
+
+           ?subject ?predicate [
+             a ?nodeClass;
+             ?keyPred ?newKey;
+             ?valuePred ?newValue ] .
+
+        There should be a complementary readMapping that gets you a
+        value since that's tricky too
+        """
+
+        with self.currentState() as graph:
+            adds = set([])
+            for setting in graph.objects(subject, predicate):
+                if graph.value(setting, keyPred) == newKey:
+                    break
+            else:
+                setting = URIRef(subject + "/map/%s" %
+                                 random.randrange(999999999))
+                adds.update([
+                    (subject, predicate, setting, context),
+                    (setting, RDF.type, nodeClass, context),
+                    (setting, keyPred, newKey, context),
+                    ])
+            dels = set([])
+            for prev in graph.objects(setting, valuePred):
+                dels.add((setting, valuePred, prev, context))
+            adds.add((setting, valuePred, newValue, context))
+
+            if adds != dels:
+                self.patch(Patch(delQuads=dels, addQuads=adds))
+
+    def removeMappingNode(self, context, node):
+        """
+        removes the statements with this node as subject or object, which
+        is the right amount of statements to remove a node that
+        patchMapping made.
+        """
+        p = Patch(delQuads=[spo+(context,) for spo in
+                            chain(self._graph.triples((None, None, node),
+                                                      context=context),
+                                  self._graph.triples((node, None, None),
+                                                      context=context))])
+        self.patch(p)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/light9/rdfdb/patchreceiver.py	Tue Mar 26 07:48:07 2013 +0000
@@ -0,0 +1,71 @@
+import logging, cyclone.httpclient, traceback, urllib
+from twisted.internet import reactor
+from light9.rdfdb.rdflibpatch import patchQuads
+from light9.rdfdb.patch import Patch
+log = logging.getLogger('syncedgraph')
+
+class PatchReceiver(object):
+    """
+    runs a web server in this process and registers it with the rdfdb
+    master. See onPatch for what happens when the rdfdb master sends
+    us a patch
+    """
+    def __init__(self, label, graph, initiallySynced):
+        self.graph = graph
+        self.initiallySynced = initiallySynced
+        
+        listen = reactor.listenTCP(0, cyclone.web.Application(handlers=[
+            (r'/update', makePatchEndpoint(self.onPatch)),
+        ]))
+        port = listen._realPortNumber  # what's the right call for this?
+        self.updateResource = 'http://localhost:%s/update' % port
+        log.info("listening on %s" % port)
+        self.register(label)
+
+    def onPatch(self, p):
+        """
+        central server has sent us a patch
+        """
+        patchQuads(self.graph, p.delQuads, p.addQuads, perfect=True)
+        log.info("graph now has %s statements" % len(self.graph))
+        try:
+            self.updateOnPatch(p)
+        except Exception:
+            # don't reflect this back to the server; we did
+            # receive its patch correctly.
+            traceback.print_exc()
+
+        if self.initiallySynced:
+            self.initiallySynced.callback(None)
+            self.initiallySynced = None
+
+    def register(self, label):
+
+        def done(x):
+            log.debug("registered with rdfdb")
+
+        cyclone.httpclient.fetch(
+            url='http://localhost:8051/graphClients',
+            method='POST',
+            headers={'Content-Type': ['application/x-www-form-urlencoded']},
+            postdata=urllib.urlencode([('clientUpdate', self.updateResource),
+                                       ('label', label)]),
+            ).addCallbacks(done, log.error)
+        log.info("registering with rdfdb")
+
+        
+def makePatchEndpointPutMethod(cb):
+    def put(self):
+        try:
+            p = Patch(jsonRepr=self.request.body)
+            log.info("received patch -%d +%d" % (len(p.delGraph), len(p.addGraph)))
+            cb(p)
+        except:
+            traceback.print_exc()
+            raise
+    return put
+
+def makePatchEndpoint(cb):
+    class Update(cyclone.web.RequestHandler):
+        put = makePatchEndpointPutMethod(cb)
+    return Update
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/light9/rdfdb/patchsender.py	Tue Mar 26 07:48:07 2013 +0000
@@ -0,0 +1,95 @@
+import logging
+import cyclone.httpclient
+from twisted.internet import defer
+log = logging.getLogger('syncedgraph')
+
+def sendPatch(putUri, patch, **kw):
+    """
+    kwargs will become extra attributes in the toplevel json object
+    """
+    body = patch.makeJsonRepr(kw)
+    log.debug("send body: %r", body)
+    def ok(done):
+        if not str(done.code).startswith('2'):
+            raise ValueError("sendPatch request failed %s: %s" % (done.code, done.body))
+        log.debug("sendPatch finished, response: %s" % done.body)
+        return done
+
+    return cyclone.httpclient.fetch(
+        url=putUri,
+        method='PUT',
+        headers={'Content-Type': ['application/json']},
+        postdata=body,
+        ).addCallback(ok)
+
+class PatchSender(object):
+    """
+    SyncedGraph may generate patches faster than we can send
+    them. This object buffers and may even collapse patches before
+    they go the server
+    """
+    def __init__(self, target, myUpdateResource):
+        self.target = target
+        self.myUpdateResource = myUpdateResource
+        self._patchesToSend = []
+        self._currentSendPatchRequest = None
+
+    def sendPatch(self, p):
+        sendResult = defer.Deferred()
+        self._patchesToSend.append((p, sendResult))
+        self._continueSending()
+        return sendResult
+
+    def _continueSending(self):
+        if not self._patchesToSend or self._currentSendPatchRequest:
+            return
+        if len(self._patchesToSend) > 1:
+            log.info("%s patches left to send", len(self._patchesToSend))
+            # this is where we could concatenate little patches into a
+            # bigger one. Often, many statements will cancel each
+            # other out. not working yet:
+            if 0:
+                p = self._patchesToSend[0].concat(self._patchesToSend[1:])
+                print "concat down to"
+                print 'dels'
+                for q in p.delQuads: print q
+                print 'adds'
+                for q in p.addQuads: print q
+                print "----"
+            else:
+                p, sendResult = self._patchesToSend.pop(0)
+        else:
+            p, sendResult = self._patchesToSend.pop(0)
+
+        self._currentSendPatchRequest = sendPatch(
+            self.target, p, senderUpdateUri=self.myUpdateResource)
+        self._currentSendPatchRequest.addCallbacks(self._sendPatchDone,
+                                                   self._sendPatchErr)
+        self._currentSendPatchRequest.chainDeferred(sendResult)
+
+    def _sendPatchDone(self, result):
+        self._currentSendPatchRequest = None
+        self._continueSending()
+
+    def _sendPatchErr(self, e):
+        self._currentSendPatchRequest = None
+        # we're probably out of sync with the master now, since
+        # SyncedGraph.patch optimistically applied the patch to our
+        # local graph already. What happens to this patch? What
+        # happens to further pending patches? Some of the further
+        # patches, especially, may be commutable with the bad one and
+        # might still make sense to apply to the master graph.
+
+        # if someday we are folding pending patches together, this
+        # would be the time to UNDO that and attempt the original
+        # separate patches again
+
+        # this should screen for 409 conflict responses and raise a
+        # special exception for that, so SyncedGraph.sendFailed can
+        # screen for only that type
+
+        # this code is going away; we're going to raise an exception that contains all the pending patches
+        log.error("_sendPatchErr")
+        log.error(e)
+        self._continueSending()
+
--- a/light9/rdfdb/syncedgraph.py	Tue Mar 26 07:08:44 2013 +0000
+++ b/light9/rdfdb/syncedgraph.py	Tue Mar 26 07:48:07 2013 +0000
@@ -1,180 +1,39 @@
-from rdflib import ConjunctiveGraph, RDFS, RDF, URIRef
-import logging, cyclone.httpclient, traceback, urllib, random
-from itertools import chain
-from twisted.internet import reactor, defer
+"""
+client code uses a SyncedGraph, which has a few things:
+
+AutoDepGraphApi - knockoutjs-inspired API for querying the graph in a
+way that lets me call you again when there were changes to the things
+you queried
+
+CurrentStateGraphApi - a way to query the graph that doesn't gather
+your dependencies like AutoDepGraphApi does
+
+GraphEditApi - methods to write patches to the graph for common
+operations, e.g. replacing a value, or editing a mapping
+
+PatchReceiver - our web server that listens to edits from the master graph
+
+PatchSender - collects and transmits your graph edits
+"""
+
+from rdflib import ConjunctiveGraph
+import logging, cyclone.httpclient
+from twisted.internet import defer
 log = logging.getLogger('syncedgraph')
-from light9.rdfdb.patch import Patch
-from light9.rdfdb.rdflibpatch import patchQuads, contextsForStatement as rp_contextsForStatement
+from light9.rdfdb.rdflibpatch import patchQuads
+
+from light9.rdfdb.patchsender import PatchSender
+from light9.rdfdb.patchreceiver import PatchReceiver
+from light9.rdfdb.currentstategraphapi import CurrentStateGraphApi
+from light9.rdfdb.autodepgraphapi import AutoDepGraphApi
+from light9.rdfdb.grapheditapi import GraphEditApi
 
 # everybody who writes literals needs to get this
 from rdflibpatch_literal import patch
 patch()
 
 
-def sendPatch(putUri, patch, **kw):
-    """
-    kwargs will become extra attributes in the toplevel json object
-    """
-    body = patch.makeJsonRepr(kw)
-    log.debug("send body: %r", body)
-    def ok(done):
-        if not str(done.code).startswith('2'):
-            raise ValueError("sendPatch request failed %s: %s" % (done.code, done.body))
-        log.debug("sendPatch finished, response: %s" % done.body)
-        return done
-
-    return cyclone.httpclient.fetch(
-        url=putUri,
-        method='PUT',
-        headers={'Content-Type': ['application/json']},
-        postdata=body,
-        ).addCallback(ok)
-
-def makePatchEndpointPutMethod(cb):
-    def put(self):
-        try:
-            p = Patch(jsonRepr=self.request.body)
-            log.info("received patch -%d +%d" % (len(p.delGraph), len(p.addGraph)))
-            cb(p)
-        except:
-            traceback.print_exc()
-            raise
-    return put
-
-def makePatchEndpoint(cb):
-    class Update(cyclone.web.RequestHandler):
-        put = makePatchEndpointPutMethod(cb)
-    return Update
-
-class GraphWatchers(object):
-    """
-    store the current handlers that care about graph changes
-    """
-    def __init__(self):
-        self._handlersSp = {} # (s,p): set(handlers)
-        self._handlersPo = {} # (p,o): set(handlers)
-
-    def addSubjPredWatcher(self, func, s, p):
-        if func is None:
-            return
-        key = s, p
-        try:
-            self._handlersSp.setdefault(key, set()).add(func)
-        except Exception:
-            log.error("with key %r and func %r" % (key, func))
-            raise
-
-    def addPredObjWatcher(self, func, p, o):
-        self._handlersPo.setdefault((p, o), set()).add(func)
-
-    def whoCares(self, patch):
-        """what handler functions would care about the changes in this patch?
-
-        this removes the handlers that it gives you
-        """
-        #self.dependencies()
-        affectedSubjPreds = set([(s, p) for s, p, o, c in patch.addQuads]+
-                                [(s, p) for s, p, o, c in patch.delQuads])
-        affectedPredObjs = set([(p, o) for s, p, o, c in patch.addQuads]+
-                                [(p, o) for s, p, o, c in patch.delQuads])
-
-        ret = set()
-        for (s, p), funcs in self._handlersSp.iteritems():
-            if (s, p) in affectedSubjPreds:
-                ret.update(funcs)
-                funcs.clear()
-
-        for (p, o), funcs in self._handlersPo.iteritems():
-            if (p, o) in affectedPredObjs:
-                ret.update(funcs)
-                funcs.clear()
-
-        return ret
-
-    def dependencies(self):
-        """
-        for debugging, make a list of all the active handlers and what
-        data they depend on. This is meant for showing on the web ui
-        for browsing.
-        """
-        log.info("whocares:")
-        from pprint import pprint
-        pprint(self._handlersSp)
-
-
-class PatchSender(object):
-    """
-    SyncedGraph may generate patches faster than we can send
-    them. This object buffers and may even collapse patches before
-    they go the server
-    """
-    def __init__(self, target, myUpdateResource):
-        self.target = target
-        self.myUpdateResource = myUpdateResource
-        self._patchesToSend = []
-        self._currentSendPatchRequest = None
-
-    def sendPatch(self, p):
-        sendResult = defer.Deferred()
-        self._patchesToSend.append((p, sendResult))
-        self._continueSending()
-        return sendResult
-
-    def _continueSending(self):
-        if not self._patchesToSend or self._currentSendPatchRequest:
-            return
-        if len(self._patchesToSend) > 1:
-            log.info("%s patches left to send", len(self._patchesToSend))
-            # this is where we could concatenate little patches into a
-            # bigger one. Often, many statements will cancel each
-            # other out. not working yet:
-            if 0:
-                p = self._patchesToSend[0].concat(self._patchesToSend[1:])
-                print "concat down to"
-                print 'dels'
-                for q in p.delQuads: print q
-                print 'adds'
-                for q in p.addQuads: print q
-                print "----"
-            else:
-                p, sendResult = self._patchesToSend.pop(0)
-        else:
-            p, sendResult = self._patchesToSend.pop(0)
-
-        self._currentSendPatchRequest = sendPatch(
-            self.target, p, senderUpdateUri=self.myUpdateResource)
-        self._currentSendPatchRequest.addCallbacks(self._sendPatchDone,
-                                                   self._sendPatchErr)
-        self._currentSendPatchRequest.chainDeferred(sendResult)
-
-    def _sendPatchDone(self, result):
-        self._currentSendPatchRequest = None
-        self._continueSending()
-
-    def _sendPatchErr(self, e):
-        self._currentSendPatchRequest = None
-        # we're probably out of sync with the master now, since
-        # SyncedGraph.patch optimistically applied the patch to our
-        # local graph already. What happens to this patch? What
-        # happens to further pending patches? Some of the further
-        # patches, especially, may be commutable with the bad one and
-        # might still make sense to apply to the master graph.
-
-        # if someday we are folding pending patches together, this
-        # would be the time to UNDO that and attempt the original
-        # separate patches again
-
-        # this should screen for 409 conflict responses and raise a
-        # special exception for that, so SyncedGraph.sendFailed can
-        # screen for only that type
-
-        # this code is going away; we're going to raise an exception that contains all the pending patches
-        log.error("_sendPatchErr")
-        log.error(e)
-        self._continueSending()
-
-
-class SyncedGraph(object):
+class SyncedGraph(CurrentStateGraphApi, AutoDepGraphApi, GraphEditApi):
     """
     graph for clients to use. Changes are synced with the master graph
     in the rdfdb process.
@@ -199,37 +58,13 @@
         """
         self.initiallySynced = defer.Deferred()
         _graph = self._graph = ConjunctiveGraph()
-        self._watchers = GraphWatchers()
 
-        def onPatch(p):
-            """
-            central server has sent us a patch
-            """
-            patchQuads(_graph, p.delQuads, p.addQuads, perfect=True)
-            log.info("graph now has %s statements" % len(_graph))
-            try:
-                self.updateOnPatch(p)
-            except Exception:
-                # don't reflect this back to the server; we did
-                # receive its patch correctly.
-                traceback.print_exc()
-
-            if self.initiallySynced:
-                self.initiallySynced.callback(None)
-                self.initiallySynced = None
-
-
-        listen = reactor.listenTCP(0, cyclone.web.Application(handlers=[
-            (r'/update', makePatchEndpoint(onPatch)),
-        ]))
-        port = listen._realPortNumber  # what's the right call for this?
-        self.updateResource = 'http://localhost:%s/update' % port
-        log.info("listening on %s" % port)
-        self.register(label)
-        self.currentFuncs = [] # stack of addHandler callers
+        self._receiver = PatchReceiver(label, _graph, self.initiallySynced)
+        
         self._sender = PatchSender('http://localhost:8051/patches',
-                                   self.updateResource)
-
+                                   self._receiver.updateResource)
+        AutoDepGraphApi.__init__(self)
+        
     def resync(self):
         """
         get the whole graph again from the server (e.g. we had a
@@ -255,20 +90,6 @@
         #diff against old entire graph
         #broadcast that change
 
-    def register(self, label):
-
-        def done(x):
-            log.debug("registered with rdfdb")
-
-        cyclone.httpclient.fetch(
-            url='http://localhost:8051/graphClients',
-            method='POST',
-            headers={'Content-Type': ['application/x-www-form-urlencoded']},
-            postdata=urllib.urlencode([('clientUpdate', self.updateResource),
-                                       ('label', label)]),
-            ).addCallbacks(done, log.error)
-        log.info("registering with rdfdb")
-
     def patch(self, p):
         """send this patch to the server and apply it to our local
         graph and run handlers"""
@@ -290,188 +111,3 @@
         #i think we should receive back all the pending patches,
         #do a resysnc here,
         #then requeue all the pending patches (minus the failing one?) after that's done.
-
-
-    def patchObject(self, context, subject, predicate, newObject):
-        """send a patch which removes existing values for (s,p,*,c)
-        and adds (s,p,newObject,c). Values in other graphs are not affected.
-
-        newObject can be None, which will remove all (subj,pred,*) statements.
-        """
-
-        existing = []
-        for spo in self._graph.triples((subject, predicate, None),
-                                     context=context):
-            existing.append(spo+(context,))
-        # what layer is supposed to cull out no-op changes?
-        self.patch(Patch(
-            delQuads=existing,
-            addQuads=([(subject, predicate, newObject, context)]
-                      if newObject is not None else [])))
-
-    def patchMapping(self, context, subject, predicate, nodeClass, keyPred, valuePred, newKey, newValue):
-        """
-        creates/updates a structure like this:
-
-           ?subject ?predicate [
-             a ?nodeClass;
-             ?keyPred ?newKey;
-             ?valuePred ?newValue ] .
-
-        There should be a complementary readMapping that gets you a
-        value since that's tricky too
-        """
-
-        with self.currentState() as graph:
-            adds = set([])
-            for setting in graph.objects(subject, predicate):
-                if graph.value(setting, keyPred) == newKey:
-                    break
-            else:
-                setting = URIRef(subject + "/map/%s" %
-                                 random.randrange(999999999))
-                adds.update([
-                    (subject, predicate, setting, context),
-                    (setting, RDF.type, nodeClass, context),
-                    (setting, keyPred, newKey, context),
-                    ])
-            dels = set([])
-            for prev in graph.objects(setting, valuePred):
-                dels.add((setting, valuePred, prev, context))
-            adds.add((setting, valuePred, newValue, context))
-
-            if adds != dels:
-                self.patch(Patch(delQuads=dels, addQuads=adds))
-
-    def removeMappingNode(self, context, node):
-        """
-        removes the statements with this node as subject or object, which
-        is the right amount of statements to remove a node that
-        patchMapping made.
-        """
-        p = Patch(delQuads=[spo+(context,) for spo in
-                            chain(self._graph.triples((None, None, node),
-                                                      context=context),
-                                  self._graph.triples((node, None, None),
-                                                      context=context))])
-        self.patch(p)
-                
-    def addHandler(self, func):
-        """
-        run this (idempotent) func, noting what graph values it
-        uses. Run it again in the future if there are changes to those
-        graph values. The func might use different values during that
-        future call, and those will be what we watch for next.
-        """
-
-        # if we saw this func before, we need to forget the old
-        # callbacks it wanted and replace with the new ones we see
-        # now.
-
-        # if one handler func calls another, does that break anything?
-        # maybe not?
-
-        # no plan for sparql queries yet. Hook into a lower layer that
-        # reveals all their statement fetches? Just make them always
-        # new? Cache their results, so if i make the query again and
-        # it gives the same result, I don't call the handler?
-
-        self.currentFuncs.append(func)
-        try:
-            func()
-        finally:
-            self.currentFuncs.pop()
-
-    def updateOnPatch(self, p):
-        """
-        patch p just happened to the graph; call everyone back who
-        might care, and then notice what data they depend on now
-        """
-        for func in self._watchers.whoCares(p):
-            # todo: forget the old handlers for this func
-            self.addHandler(func)
-
-    def currentState(self, context=None):
-        """
-        a graph you can read without being in an addHandler
-        """
-        if context is not None:
-            raise NotImplementedError("currentState with context arg")
-
-        class Mgr(object):
-            def __enter__(self2):
-                # this should be a readonly view of the existing
-                # graph, maybe with something to guard against
-                # writes/patches happening while reads are being
-                # done. Typical usage will do some reads on this graph
-                # before moving on to writes.
-                
-                g = ConjunctiveGraph()
-                for s,p,o,c in self._graph.quads((None,None,None)):
-                    g.store.add((s,p,o), c)
-                g.contextsForStatement = lambda t: contextsForStatementNoWildcards(g, t)
-                return g
-
-            def __exit__(self, type, val, tb):
-                return
-
-        return Mgr()
-
-    def _getCurrentFunc(self):
-        if not self.currentFuncs:
-            # this may become a warning later
-            raise ValueError("asked for graph data outside of a handler")
-
-        # we add the watcher to the deepest function, since that
-        # should be the cheapest way to update when this part of the
-        # data changes
-        return self.currentFuncs[-1]
-
-    # these just call through to triples() so it might be possible to
-    # watch just that one.
-
-    # if you get a bnode in your response, maybe the answer to
-    # dependency tracking is to say that you depended on the triple
-    # that got you that bnode, since it is likely to change to another
-    # bnode later. This won't work if the receiver stores bnodes
-    # between calls, but probably most of them don't do that (they
-    # work from a starting uri)
-
-    def value(self, subject=None, predicate=RDF.value, object=None,
-              default=None, any=True):
-        if object is not None:
-            raise NotImplementedError()
-        func = self._getCurrentFunc()
-        self._watchers.addSubjPredWatcher(func, subject, predicate)
-        return self._graph.value(subject, predicate, object=object,
-                                 default=default, any=any)
-
-    def objects(self, subject=None, predicate=None):
-        func = self._getCurrentFunc()
-        self._watchers.addSubjPredWatcher(func, subject, predicate)
-        return self._graph.objects(subject, predicate)
-
-    def label(self, uri):
-        return self.value(uri, RDFS.label)
-
-    def subjects(self, predicate=None, object=None):
-        func = self._getCurrentFunc()
-        self._watchers.addPredObjWatcher(func, predicate, object)
-        return self._graph.subjects(predicate, object)
-
-    def contextsForStatement(self, triple):
-        """currently this needs to be in an addHandler section, but it
-        sets no watchers so it won't actually update if the statement
-        was added or dropped from contexts"""
-        func = self._getCurrentFunc()
-        return contextsForStatementNoWildcards(self._graph, triple)
-
-    # i find myself wanting 'patch' (aka enter/leave) versions of these calls that tell
-    # you only what results have just appeared or disappeared. I think
-    # I'm going to be repeating that logic a lot. Maybe just for the
-    # subjects(RDF.type, t) call
-
-def contextsForStatementNoWildcards(g, triple):
-    if None in triple:
-        raise NotImplementedError("no wildcards")
-    return rp_contextsForStatement(g, triple)