changeset 797:904913de4599

deletes are now quads. refactor files. named clients. auto client port Ignore-this: 44f83643c28cbb0f961e2c8c1267d398
author drewp@bigasterisk.com
date Fri, 13 Jul 2012 19:25:03 +0000
parents 37d05bd17b10
children 5c158d37f1ce
files bin/clientdemo bin/rdfdb light9/rdfdb.py light9/rdfdb.xhtml light9/rdfdb/__init__.py light9/rdfdb/graphfile.py light9/rdfdb/patch.py light9/rdfdb/syncedgraph.py
diffstat 7 files changed, 313 insertions(+), 276 deletions(-) [+]
line wrap: on
line diff
--- a/bin/clientdemo	Fri Jul 13 18:25:34 2012 +0000
+++ b/bin/clientdemo	Fri Jul 13 19:25:03 2012 +0000
@@ -5,14 +5,14 @@
 from twisted.internet import reactor
 import cyclone.web, cyclone.httpclient, logging
 from rdflib import Namespace, Literal
-from light9 import rdfdb
+from light9.rdfdb.patch import Patch
+from light9.rdfdb.syncedgraph import SyncedGraph
 
 if __name__ == "__main__":
     logging.basicConfig(level=logging.DEBUG)
     log = logging.getLogger()
 
-    port = 8052
-    g = rdfdb.SyncedGraph(port)
+    g = SyncedGraph("clientdemo")
 
     L9 = Namespace("http://light9.bigasterisk.com/")
     def updateDemoValue():
@@ -22,8 +22,8 @@
     g.addHandler(updateDemoValue)
 
     def adj():
-        g.patch(rdfdb.Patch(addQuads=[(L9['demo'], L9['is'], Literal(os.getpid()), L9['clientdemo'])],
-                delTriples=[]))
+        g.patch(Patch(addQuads=[(L9['demo'], L9['is'], Literal(os.getpid()),
+                                 L9['clientdemo'])],
+                      delQuads=[]))
     reactor.callLater(2, adj)
-
     reactor.run()
--- a/bin/rdfdb	Fri Jul 13 18:25:34 2012 +0000
+++ b/bin/rdfdb	Fri Jul 13 19:25:03 2012 +0000
@@ -21,15 +21,11 @@
 Maybe some subgraphs are for transient data (e.g. current timecode,
 mouse position in curvecalc) that only some listeners want to hear about.
 
-Deletes aren't graph-specific, they affect all graphs at once, since
-this seems much less confusing to the caller trying to delete a
-statement. But, this may lead to weird things when two graphs have the
-same statement, and then one deletes it. Or when deleting a stmt that
-you see in file1 causes an edit to file2. This plan is making it hard
-to invert a patch, so it's about to change.
+Deletes are graph-specific, so callers may be surprised to delete a
+stmt from one graph but then find that statement is still true.
 
-Alternate plan for deletes: insist that every patch is only within one
-subgraph, and just leave dup statements from other graphs alone.
+Alternate plan: would it help to insist that every patch is within
+only one subgraph? I think it's ok for them to span multiple ones.
 
 Inserts can be made on any subgraphs, and each subgraph is saved in
 its own file. The file might not be in a format that can express
@@ -78,16 +74,17 @@
 
 """
 from twisted.internet import reactor
+import twisted.internet.error
 import sys, optparse, logging, json, os
 import cyclone.web, cyclone.httpclient, cyclone.websocket
-from rdflib import URIRef
 sys.path.append(".")
 from light9 import networking, showconfig
 from rdflib import ConjunctiveGraph, URIRef, Graph
-from light9 import rdfdb 
+from light9.rdfdb.graphfile import GraphFile
+from light9.rdfdb.patch import Patch, ALLSTMTS
+from light9.rdfdb import syncedgraph
 
 from twisted.internet.inotify import INotify
-from twisted.python.filepath import FilePath
 logging.basicConfig(level=logging.DEBUG)
 log = logging.getLogger()
 
@@ -100,46 +97,28 @@
         pass
 
 class Client(object):
-    def __init__(self, updateUri, db):
+    """
+    one of our syncedgraph clients
+    """
+    def __init__(self, updateUri, label, db):
         self.db = db
+        self.label = label
         self.updateUri = updateUri
         self.sendAll()
 
+    def __repr__(self):
+        return "<%s client at %s>" % (self.label, self.updateUri)
+
     def sendAll(self):
         """send the client the whole graph contents"""
-        log.info("sending all graphs to %s" % self.updateUri)
-        self.sendPatch(rdfdb.Patch(
-            addQuads=self.db.graph.quads(rdfdb.ALLSTMTS),
-            delTriples=[]))
+        log.info("sending all graphs to %s at %s" %
+                 (self.label, self.updateUri))
+        self.sendPatch(Patch(
+            addQuads=self.db.graph.quads(ALLSTMTS),
+            delQuads=[]))
         
     def sendPatch(self, p):
-        rdfdb.sendPatch(self.updateUri, p)
-        # err something if the client is gone, so it can be dropped
-        # from the list
-
-class GraphFile(object):
-    def __init__(self, notifier, path, uri, patch, getSubgraph):
-        self.path, self.uri = path, uri
-        self.patch, self.getSubgraph = patch, getSubgraph
-
-        notifier.watch(FilePath(path), callbacks=[self.notify])
-        self.reread()
-      
-    def notify(self, notifier, filepath, mask):
-        log.info("file %s changed" % filepath)
-        self.reread()
-
-    def reread(self):
-        """update tha graph with any diffs from this file"""
-        old = self.getSubgraph(self.uri)
-        new = Graph()
-        new.parse(location=self.path, format='n3')
-
-        adds = [(s,p,o,self.uri) for s,p,o in new-old]
-        dels = [(s,p,o) for s,p,o in old-new]
-
-        if adds or dels:
-            self.patch(rdfdb.Patch(addQuads=adds, delTriples=dels))
+        return syncedgraph.sendPatch(self.updateUri, p)
 
 class Db(object):
     def __init__(self):
@@ -162,18 +141,28 @@
         apply this patch to the master graph then notify everyone about it
         """
         log.info("patching graph with %s adds %s dels" %
-                 (len(p.addQuads), len(p.delTriples)))
-        for s in p.delTriples:
-            self.graph.remove(s)
+                 (len(p.addQuads), len(p.delQuads)))
+
+        
+        for spoc in p.delQuads:
+            # probably need to insist that these existed, or else cull
+            # the ones that didn't exist, to make the patch invert right
+            self.graph.get_context(spoc[3]).remove(spoc[:3])
 
         addQuads = p.addQuads[:2] # test
 
         self.graph.addN(addQuads)
         self.summarizeToLog()
         for c in self.clients:
-            c.sendPatch(rdfdb.Patch(addQuads=addQuads, delTriples=p.delTriples))
+            d = c.sendPatch(Patch(addQuads=addQuads, delQuads=p.delQuads))
+            d.addErrback(self.clientErrored, c)
         sendToLiveClients(asJson=p.jsonRepr)
 
+    def clientErrored(self, err, c):
+        err.trap(twisted.internet.error.ConnectError)
+        log.info("connection error- dropping client %r" % c)
+        self.clients.remove(c)
+
     def summarizeToLog(self):
         log.info("contexts in graph %s:" % len(self.graph))
         for c in self.graph.contexts():
@@ -185,21 +174,22 @@
         #return self.graph.get_context(uri)
 
         g = Graph()
-        for s in self.graph.triples(rdfdb.ALLSTMTS, uri):
+        for s in self.graph.triples(ALLSTMTS, uri):
             g.add(s)
         return g
     
-    def addClient(self, updateUri):
+    def addClient(self, updateUri, label):
         [self.clients.remove(c)
          for c in self.clients if c.updateUri == updateUri]
 
-        log.info("new client from %s" % updateUri)
-        self.clients.append(Client(updateUri, self))
+        log.info("new client %s at %s" % (label, updateUri))
+        self.clients.append(Client(updateUri, label, self))
         self.sendClientsToAllLivePages()
 
     def sendClientsToAllLivePages(self):
-        sendToLiveClients({"clients":[c.updateUri for c in self.clients]})
-        
+        sendToLiveClients({"clients":[
+            dict(updateUri=c.updateUri, label=c.label)
+            for c in self.clients]})        
 
 class Index(PrettyErrorHandler, cyclone.web.RequestHandler):
     def get(self):
@@ -213,7 +203,7 @@
 class Patches(PrettyErrorHandler, cyclone.web.RequestHandler):
     def __init__(self, *args, **kw):
         cyclone.web.RequestHandler.__init__(self, *args, **kw)
-        p = rdfdb.makePatchEndpointPutMethod(self.settings.db.patch)
+        p = syncedgraph.makePatchEndpointPutMethod(self.settings.db.patch)
         self.put = lambda: p(self)
 
     def get(self):
@@ -227,7 +217,7 @@
     def post(self):
         upd = self.get_argument("clientUpdate")
         try:
-            self.settings.db.addClient(upd)
+            self.settings.db.addClient(upd, self.get_argument("label"))
         except:
             import traceback
             traceback.print_exc()
--- a/light9/rdfdb.py	Fri Jul 13 18:25:34 2012 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,214 +0,0 @@
-from rdflib import ConjunctiveGraph, Graph
-import json, logging, cyclone.httpclient, traceback, urllib
-from twisted.internet import reactor
-log = logging.getLogger()
-
-ALLSTMTS = (None, None, None)
-
-class Patch(object):
-    """
-    the json representation includes the {"patch":...} wrapper
-    """
-    def __init__(self, jsonRepr=None, addQuads=None, delTriples=None,
-                 addGraph=None, delGraph=None):
-        self._jsonRepr = jsonRepr
-        self._addQuads, self._delTriples = addQuads, delTriples
-        self._addGraph, self._delGraph = addGraph, delGraph
-
-        if self._jsonRepr is not None:
-            body = json.loads(self._jsonRepr)
-            self._delGraph = Graph()
-            self._delGraph.parse(data=body['patch']['deletes'], format='nt')
-            self._addGraph = ConjunctiveGraph()
-            self._addGraph.parse(data=body['patch']['adds'], format='nquads')
-
-    @property
-    def addQuads(self):
-        if self._addQuads is None:
-            if self._addGraph is not None:
-                self._addQuads = list(self._addGraph.quads(ALLSTMTS))
-            else:
-                raise
-        return self._addQuads
-
-    @property
-    def delTriples(self):
-        if self._delTriples is None:
-            if self._delGraph is not None:
-                self._delTriples = list(self._delGraph.triples(ALLSTMTS))
-            else:
-                raise
-        return self._delTriples
-
-    @property
-    def addGraph(self):
-        if self._addGraph is None:
-            raise
-        return self._addGraph
-
-    @property
-    def delGraph(self):
-        if self._delGraph is None:
-            raise
-        return self._delGraph
-
-    @property
-    def jsonRepr(self):
-        if self._jsonRepr is None:
-            addGraph = ConjunctiveGraph()
-            #addGraph.addN(addQuads) # no effect on nquad output
-            for s,p,o,c in self.addQuads:
-                addGraph.get_context(c).add((s,p,o))
-                #addGraph.store.add((s,p,o), c) # no effect on nquad output
-            delGraph = Graph()
-            for s in self.delTriples:
-                delGraph.add(s)
-            self._jsonRepr = json.dumps({"patch": {
-                'adds':addGraph.serialize(format='nquads'),
-                'deletes':delGraph.serialize(format='nt'),
-                }})
-        return self._jsonRepr
-
-def sendPatch(putUri, patch):
-
-    # this will take args for sender, etc
-    body = patch.jsonRepr
-    log.debug("send body: %r" % body)
-    def ok(done):
-        if not str(done.code).startswith('2'):
-            raise ValueError("sendPatch request failed %s: %s" % (done.code, done.body))
-        log.debug("sendPatch finished, response: %s" % done.body)
-        return done
-
-    def err(e):
-        log.warn("sendPatch failed %r" % e)
-        raise e
-
-    return cyclone.httpclient.fetch(
-        url=putUri,
-        method='PUT',
-        headers={'Content-Type': ['application/json']},
-        postdata=body,
-        ).addCallbacks(ok, err)
-
-def makePatchEndpointPutMethod(cb):
-    def put(self):
-        try:
-            p = Patch(jsonRepr=self.request.body)
-            log.info("received patch -%d +%d" % (len(p.delGraph), len(p.addGraph)))
-            cb(p)
-        except:
-            traceback.print_exc()
-            raise
-    return put
-
-def makePatchEndpoint(cb):
-    class Update(cyclone.web.RequestHandler):
-        put = makePatchEndpointPutMethod(cb)
-    return Update
-
-class GraphWatchers(object):
-    def __init__(self):
-        self._handlersSp = {} # (s,p): set(handlers)
-
-    def addSubjPredWatcher(self, func, s, p):
-        if func is None:
-            return
-        key = s, p
-        self._handlersSp.setdefault(key, set()).add(func)
-
-    def whoCares(self, p):
-        """what functions would care about the changes in this patch"""
-        ret = set()
-        for s in self._handlersSp.values():
-            ret.update(s)
-        return ret
-
-class SyncedGraph(object):
-    """
-    api like rdflib.Graph which sends updates to rdfdb and can call
-    you back when there are graph changes
-    """
-    def __init__(self, port):
-        _graph = self._graph = ConjunctiveGraph()
-        self._watchers = GraphWatchers()
-        
-        #then i try adding a statement that i will react to if i see it
-        #then i print updates to that statement as they come
-        #and the statement has a PID in it so we can see two clientdemos competing
-        #then factor out this client, and have real light9 tools start using it to build their graphs
-        #they just do full reload on relevant subgraphs at first, get progressively better
-        
-        def onPatch(p):
-            for s in p.delGraph:
-                _graph.remove(s)
-            _graph.addN(p.addGraph.quads(ALLSTMTS))
-            log.info("graph now has %s statements" % len(_graph))
-            self.updateOnPatch(p)
-
-        reactor.listenTCP(port, cyclone.web.Application(handlers=[
-            (r'/update', makePatchEndpoint(onPatch)),
-        ]))
-        self.updateResource = 'http://localhost:%s/update' % port
-        log.info("listening on %s" % port)
-        self.register()
-
-    def updateOnPatch(self, p):
-        for func in self._watchers.whoCares(p):
-            self.addHandler(func)
-
-    def register(self):
-
-        def done(x):
-            print "registered", x.body
-
-        cyclone.httpclient.fetch(
-            url='http://localhost:8051/graphClients',
-            method='POST',
-            headers={'Content-Type': ['application/x-www-form-urlencoded']},
-            postdata=urllib.urlencode([('clientUpdate', self.updateResource)]),
-            ).addCallbacks(done, log.error)
-        log.info("registering with rdfdb")
-
-    def patch(self, p):
-        """send this patch to the server and apply it to our local graph and run handlers"""
-        # currently this has to round-trip. But I could apply the
-        # patch here and have the server not bounce it back to me
-        return sendPatch('http://localhost:8051/patches', p)
-
-    def addHandler(self, func):
-        """
-        run this (idempotent) func, noting what graph values it
-        uses. Run it again in the future if there are changes to those
-        graph values. The func might use different values during that
-        future call, and those will be what we watch for next.
-        """
-
-        # if we saw this func before, we need to forget the old
-        # callbacks it wanted and replace with the new ones we see
-        # now.
-
-        # if one handler func calls another, does that break anything?
-        # maybe not?
-
-        # no plan for sparql queries yet. Hook into a lower layer that
-        # reveals all their statement fetches? Just make them always
-        # new? Cache their results, so if i make the query again and
-        # it gives the same result, I don't call the handler?
-        
-        self.currentFunc = func
-        try:
-            func()
-        finally:
-            self.currentFunc = None
-
-    # these just call through to triples() so it might be possible to
-    # watch just that one
-    def value(self, subj, pred):
-        self._watchers.addSubjPredWatcher(self.currentFunc, subj, pred)
-        return self._graph.value(subj, pred)
-
-    def objects(self, subject=None, predicate=None):
-        self._watchers.addSubjPredWatcher(self.currentFunc, subject, predicate)
-        return self._graph.objects(subject, predicate)
-    
--- a/light9/rdfdb.xhtml	Fri Jul 13 18:25:34 2012 +0000
+++ b/light9/rdfdb.xhtml	Fri Jul 13 19:25:03 2012 +0000
@@ -22,6 +22,14 @@
       .patch .deletes {
           color: #DC6F6F;
       }
+      #out {
+          white-space: pre-wrap;
+      }
+      .patch fieldset {
+          color: gray;
+          font-family: arial;
+          font-size: 75%;
+      }
       /* ]]> */
     </style>
   </head>
@@ -64,7 +72,7 @@
                   );
               }
 
-              $('#out').append($('<div>').text(JSON.stringify(evt.data)));
+              $('#out').append($('<div>').text(evt.data));
           };
       });
       // ]]>
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/light9/rdfdb/graphfile.py	Fri Jul 13 19:25:03 2012 +0000
@@ -0,0 +1,33 @@
+import logging
+from twisted.python.filepath import FilePath
+from rdflib import Graph
+from light9.rdfdb.patch import Patch
+
+log = logging.getLogger()
+
+class GraphFile(object):
+    """
+    one rdf file that we read from, write to, and notice external changes to
+    """
+    def __init__(self, notifier, path, uri, patch, getSubgraph):
+        self.path, self.uri = path, uri
+        self.patch, self.getSubgraph = patch, getSubgraph
+
+        notifier.watch(FilePath(path), callbacks=[self.notify])
+        self.reread()
+      
+    def notify(self, notifier, filepath, mask):
+        log.info("file %s changed" % filepath)
+        self.reread()
+
+    def reread(self):
+        """update the graph with any diffs from this file"""
+        old = self.getSubgraph(self.uri)
+        new = Graph()
+        new.parse(location=self.path, format='n3')
+
+        adds = [(s, p, o, self.uri) for s, p, o in new - old]
+        dels = [(s, p, o, self.uri) for s, p, o in old - new]
+
+        if adds or dels:
+            self.patch(Patch(addQuads=adds, delQuads=dels))
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/light9/rdfdb/patch.py	Fri Jul 13 19:25:03 2012 +0000
@@ -0,0 +1,68 @@
+import json
+from rdflib import ConjunctiveGraph
+
+ALLSTMTS = (None, None, None)
+
+def graphFromQuads(q):
+    g = ConjunctiveGraph()
+    #g.addN(q) # no effect on nquad output
+    for s,p,o,c in q:
+        g.get_context(c).add((s,p,o))
+        #g.store.add((s,p,o), c) # no effect on nquad output
+    return g
+
+class Patch(object):
+    """
+    the json representation includes the {"patch":...} wrapper
+    """
+    def __init__(self, jsonRepr=None, addQuads=None, delQuads=None,
+                 addGraph=None, delGraph=None):
+        self._jsonRepr = jsonRepr
+        self._addQuads, self._delQuads = addQuads, delQuads
+        self._addGraph, self._delGraph = addGraph, delGraph
+
+        if self._jsonRepr is not None:
+            body = json.loads(self._jsonRepr)
+            self._delGraph = ConjunctiveGraph()
+            self._delGraph.parse(data=body['patch']['deletes'], format='nquads')
+            self._addGraph = ConjunctiveGraph()
+            self._addGraph.parse(data=body['patch']['adds'], format='nquads')
+
+    @property
+    def addQuads(self):
+        if self._addQuads is None:
+            if self._addGraph is not None:
+                self._addQuads = list(self._addGraph.quads(ALLSTMTS))
+            else:
+                raise
+        return self._addQuads
+
+    @property
+    def delQuads(self):
+        if self._delQuads is None:
+            if self._delGraph is not None:
+                self._delQuads = list(self._delGraph.quads(ALLSTMTS))
+            else:
+                raise
+        return self._delQuads
+
+    @property
+    def addGraph(self):
+        if self._addGraph is None:
+            self._addGraph = graphFromQuads(self._addQuads)
+        return self._addGraph
+
+    @property
+    def delGraph(self):
+        if self._delGraph is None:
+            self._delGraph = graphFromQuads(self._delQuads)
+        return self._delGraph
+
+    @property
+    def jsonRepr(self):
+        if self._jsonRepr is None:
+            self._jsonRepr = json.dumps({"patch": {
+                'adds':self.addGraph.serialize(format='nquads'),
+                'deletes':self.delGraph.serialize(format='nquads'),
+                }})
+        return self._jsonRepr
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/light9/rdfdb/syncedgraph.py	Fri Jul 13 19:25:03 2012 +0000
@@ -0,0 +1,152 @@
+from rdflib import ConjunctiveGraph
+import logging, cyclone.httpclient, traceback, urllib
+from twisted.internet import reactor
+log = logging.getLogger()
+from light9.rdfdb.patch import Patch, ALLSTMTS
+
+def sendPatch(putUri, patch):
+    # this will take args for sender, etc
+    body = patch.jsonRepr
+    log.debug("send body: %r" % body)
+    def ok(done):
+        if not str(done.code).startswith('2'):
+            raise ValueError("sendPatch request failed %s: %s" % (done.code, done.body))
+        log.debug("sendPatch finished, response: %s" % done.body)
+        return done
+
+    return cyclone.httpclient.fetch(
+        url=putUri,
+        method='PUT',
+        headers={'Content-Type': ['application/json']},
+        postdata=body,
+        ).addCallback(ok)
+
+def makePatchEndpointPutMethod(cb):
+    def put(self):
+        try:
+            p = Patch(jsonRepr=self.request.body)
+            log.info("received patch -%d +%d" % (len(p.delGraph), len(p.addGraph)))
+            cb(p)
+        except:
+            traceback.print_exc()
+            raise
+    return put
+
+def makePatchEndpoint(cb):
+    class Update(cyclone.web.RequestHandler):
+        put = makePatchEndpointPutMethod(cb)
+    return Update
+
+class GraphWatchers(object):
+    def __init__(self):
+        self._handlersSp = {} # (s,p): set(handlers)
+
+    def addSubjPredWatcher(self, func, s, p):
+        if func is None:
+            return
+        key = s, p
+        self._handlersSp.setdefault(key, set()).add(func)
+
+    def whoCares(self, p):
+        """what functions would care about the changes in this patch"""
+        ret = set()
+        for s in self._handlersSp.values():
+            ret.update(s)
+        return ret
+
+class SyncedGraph(object):
+    """
+    graph for clients to use. Changes are synced with the master graph
+    in the rdfdb process. 
+    
+    This api is like rdflib.Graph but it can also call you back when
+    there are graph changes to the parts you previously read.
+    """
+    def __init__(self, label):
+        """
+        label is a string that the server will display in association
+        with your connection
+        """
+        _graph = self._graph = ConjunctiveGraph()
+        self._watchers = GraphWatchers()
+        
+        def onPatch(p):
+            for spoc in p.delGraph.quads(ALLSTMTS):
+                _graph.get_context(spoc[3]).remove(spoc[:3])
+            _graph.addN(p.addGraph.quads(ALLSTMTS))
+            log.info("graph now has %s statements" % len(_graph))
+            try:
+                self.updateOnPatch(p)
+            except Exception:
+                # don't reflect this back to the server; we did
+                # receive its patch correctly.
+                traceback.print_exc()
+
+        listen = reactor.listenTCP(0, cyclone.web.Application(handlers=[
+            (r'/update', makePatchEndpoint(onPatch)),
+        ]))
+        port = listen._realPortNumber  # what's the right call for this?
+        self.updateResource = 'http://localhost:%s/update' % port
+        log.info("listening on %s" % port)
+        self.register(label)
+
+    def updateOnPatch(self, p):
+        for func in self._watchers.whoCares(p):
+            self.addHandler(func)
+
+    def register(self, label):
+
+        def done(x):
+            print "registered", x.body
+
+        cyclone.httpclient.fetch(
+            url='http://localhost:8051/graphClients',
+            method='POST',
+            headers={'Content-Type': ['application/x-www-form-urlencoded']},
+            postdata=urllib.urlencode([('clientUpdate', self.updateResource),
+                                       ('label', label)]),
+            ).addCallbacks(done, log.error)
+        log.info("registering with rdfdb")
+
+    def patch(self, p):
+        """send this patch to the server and apply it to our local graph and run handlers"""
+        # currently this has to round-trip. But I could apply the
+        # patch here and have the server not bounce it back to me
+        return sendPatch('http://localhost:8051/patches', p)
+
+    def addHandler(self, func):
+        """
+        run this (idempotent) func, noting what graph values it
+        uses. Run it again in the future if there are changes to those
+        graph values. The func might use different values during that
+        future call, and those will be what we watch for next.
+        """
+
+        # if we saw this func before, we need to forget the old
+        # callbacks it wanted and replace with the new ones we see
+        # now.
+
+        # if one handler func calls another, does that break anything?
+        # maybe not?
+
+        # no plan for sparql queries yet. Hook into a lower layer that
+        # reveals all their statement fetches? Just make them always
+        # new? Cache their results, so if i make the query again and
+        # it gives the same result, I don't call the handler?
+        
+        self.currentFunc = func
+        try:
+            func()
+        finally:
+            self.currentFunc = None
+
+    # these just call through to triples() so it might be possible to
+    # watch just that one
+    def value(self, subj, pred):
+        self._watchers.addSubjPredWatcher(self.currentFunc, subj, pred)
+        return self._graph.value(subj, pred)
+
+    def objects(self, subject=None, predicate=None):
+        self._watchers.addSubjPredWatcher(self.currentFunc, subject, predicate)
+        return self._graph.objects(subject, predicate)
+