@@ -116,13 +116,15 @@ import cyclone.web, cyclone.httpclient, 
from light9 import networking, showconfig, prof
from rdflib import ConjunctiveGraph, URIRef, Graph
from light9.rdfdb.graphfile import GraphFile
from light9.rdfdb.patch import Patch, ALLSTMTS
from light9.rdfdb.rdflibpatch import patchQuads
from light9.rdfdb import syncedgraph

from light9.rdfdb.patchsender import sendPatch
from light9.rdfdb.patchreceiver import makePatchEndpointPutMethod

from twisted.internet.inotify import INotify
from run_local import log

@@ -152,13 +154,13 @@ class Client(object):
                 (self.label, self.updateUri))

    def sendPatch(self, p):
        return syncedgraph.sendPatch(self.updateUri, p)
        return sendPatch(self.updateUri, p)

class WatchedFiles(object):
    find files, notice new files.

    This object watches directories. Each GraphFile watches its own file.
@@ -337,13 +339,13 @@ class GraphResource(PrettyErrorHandler, 
    def get(self):

class Patches(PrettyErrorHandler, cyclone.web.RequestHandler):
    def __init__(self, *args, **kw):
        cyclone.web.RequestHandler.__init__(self, *args, **kw)
        p = syncedgraph.makePatchEndpointPutMethod(self.settings.db.patch)
        p = makePatchEndpointPutMethod(self.settings.db.patch)
        self.put = lambda: p(self)

    def get(self):

class GraphClients(PrettyErrorHandler, cyclone.web.RequestHandler):
new file 100644
new file 100644
import logging
from rdflib import RDF, RDFS
from light9.rdfdb.currentstategraphapi import contextsForStatementNoWildcards
log = logging.getLogger('syncedgraph')

class AutoDepGraphApi(object):
    mixin for SyncedGraph, separated here because these methods work together

    def __init__(self):
        self._watchers = GraphWatchers()
        self.currentFuncs = [] # stack of addHandler callers
    def addHandler(self, func):
        run this (idempotent) func, noting what graph values it
        uses. Run it again in the future if there are changes to those
        graph values. The func might use different values during that
        future call, and those will be what we watch for next.

        # if we saw this func before, we need to forget the old
        # callbacks it wanted and replace with the new ones we see
        # now.

        # if one handler func calls another, does that break anything?
        # maybe not?

        # no plan for sparql queries yet. Hook into a lower layer that
        # reveals all their statement fetches? Just make them always
        # new? Cache their results, so if i make the query again and
        # it gives the same result, I don't call the handler?


    def updateOnPatch(self, p):
        patch p just happened to the graph; call everyone back who
        might care, and then notice what data they depend on now
        for func in self._watchers.whoCares(p):
            # todo: forget the old handlers for this func

    def _getCurrentFunc(self):
        if not self.currentFuncs:
            # this may become a warning later
            raise ValueError("asked for graph data outside of a handler")

        # we add the watcher to the deepest function, since that
        # should be the cheapest way to update when this part of the
        # data changes
        return self.currentFuncs[-1]

    # these just call through to triples() so it might be possible to
    # watch just that one.

    # if you get a bnode in your response, maybe the answer to
    # dependency tracking is to say that you depended on the triple
    # that got you that bnode, since it is likely to change to another
    # bnode later. This won't work if the receiver stores bnodes
    # between calls, but probably most of them don't do that (they
    # work from a starting uri)

    def value(self, subject=None, predicate=RDF.value, object=None,
              default=None, any=True):
        if object is not None:
            raise NotImplementedError()
        func = self._getCurrentFunc()
        self._watchers.addSubjPredWatcher(func, subject, predicate)
        return self._graph.value(subject, predicate, object=object,
                                 default=default, any=any)

    def objects(self, subject=None, predicate=None):
        func = self._getCurrentFunc()
        self._watchers.addSubjPredWatcher(func, subject, predicate)
        return self._graph.objects(subject, predicate)

    def label(self, uri):
        return self.value(uri, RDFS.label)

    def subjects(self, predicate=None, object=None):
        func = self._getCurrentFunc()
        self._watchers.addPredObjWatcher(func, predicate, object)
        return self._graph.subjects(predicate, object)

    def contextsForStatement(self, triple):
        """currently this needs to be in an addHandler section, but it
        sets no watchers so it won't actually update if the statement
        was added or dropped from contexts"""
        func = self._getCurrentFunc()
        return contextsForStatementNoWildcards(self._graph, triple)

    # i find myself wanting 'patch' (aka enter/leave) versions of these calls that tell
    # you only what results have just appeared or disappeared. I think
    # I'm going to be repeating that logic a lot. Maybe just for the
    # subjects(RDF.type, t) call


class GraphWatchers(object):
    store the current handlers that care about graph changes
    def __init__(self):
        self._handlersSp = {} # (s,p): set(handlers)
        self._handlersPo = {} # (p,o): set(handlers)

    def addSubjPredWatcher(self, func, s, p):
        if func is None:
        key = s, p
            self._handlersSp.setdefault(key, set()).add(func)
        except Exception:
            log.error("with key %r and func %r" % (key, func))

    def addPredObjWatcher(self, func, p, o):
        self._handlersPo.setdefault((p, o), set()).add(func)

    def whoCares(self, patch):
        """what handler functions would care about the changes in this patch?

        this removes the handlers that it gives you
        affectedSubjPreds = set([(s, p) for s, p, o, c in patch.addQuads]+
                                [(s, p) for s, p, o, c in patch.delQuads])
        affectedPredObjs = set([(p, o) for s, p, o, c in patch.addQuads]+
                                [(p, o) for s, p, o, c in patch.delQuads])

        ret = set()
        for (s, p), funcs in self._handlersSp.iteritems():
            if (s, p) in affectedSubjPreds:

        for (p, o), funcs in self._handlersPo.iteritems():
            if (p, o) in affectedPredObjs:

        return ret

    def dependencies(self):
        for debugging, make a list of all the active handlers and what
        data they depend on. This is meant for showing on the web ui
        for browsing.
        from pprint import pprint
new file 100644
new file 100644
from rdflib import ConjunctiveGraph
from light9.rdfdb.rdflibpatch import contextsForStatement as rp_contextsForStatement

class CurrentStateGraphApi(object):
    def currentState(self, context=None):
        a graph you can read without being in an addHandler
        if context is not None:
            raise NotImplementedError("currentState with context arg")

        class Mgr(object):
            def __enter__(self2):
                # this should be a readonly view of the existing
                # graph, maybe with something to guard against
                # writes/patches happening while reads are being
                # done. Typical usage will do some reads on this graph
                # before moving on to writes.
                g = ConjunctiveGraph()
                for s,p,o,c in self._graph.quads((None,None,None)):
          ,p,o), c)
                g.contextsForStatement = lambda t: contextsForStatementNoWildcards(g, t)
                return g

            def __exit__(self, type, val, tb):

        return Mgr()

def contextsForStatementNoWildcards(g, triple):
    if None in triple:
        raise NotImplementedError("no wildcards")
    return rp_contextsForStatement(g, triple)
new file 100644
new file 100644
import random
from itertools import chain
from rdflib import URIRef, RDF
from light9.rdfdb.patch import Patch

class GraphEditApi(object):
    mixin for SyncedGraph, separated here because these methods work together

    def patchObject(self, context, subject, predicate, newObject):
        """send a patch which removes existing values for (s,p,*,c)
        and adds (s,p,newObject,c). Values in other graphs are not affected.

        newObject can be None, which will remove all (subj,pred,*) statements.

        existing = []
        for spo in self._graph.triples((subject, predicate, None),
        # what layer is supposed to cull out no-op changes?
            addQuads=([(subject, predicate, newObject, context)]
                      if newObject is not None else [])))

    def patchMapping(self, context, subject, predicate, nodeClass, keyPred, valuePred, newKey, newValue):
        creates/updates a structure like this:

           ?subject ?predicate [
             a ?nodeClass;
             ?keyPred ?newKey;
             ?valuePred ?newValue ] .

        There should be a complementary readMapping that gets you a
        value since that's tricky too

        with self.currentState() as graph:
            adds = set([])
            for setting in graph.objects(subject, predicate):
                if graph.value(setting, keyPred) == newKey:
                setting = URIRef(subject + "/map/%s" %
                    (subject, predicate, setting, context),
                    (setting, RDF.type, nodeClass, context),
                    (setting, keyPred, newKey, context),
            dels = set([])
            for prev in graph.objects(setting, valuePred):
                dels.add((setting, valuePred, prev, context))
            adds.add((setting, valuePred, newValue, context))

            if adds != dels:
                self.patch(Patch(delQuads=dels, addQuads=adds))

    def removeMappingNode(self, context, node):
        removes the statements with this node as subject or object, which
        is the right amount of statements to remove a node that
        patchMapping made.
        p = Patch(delQuads=[spo+(context,) for spo in
                            chain(self._graph.triples((None, None, node),
                                  self._graph.triples((node, None, None),
new file 100644
new file 100644
import logging, cyclone.httpclient, traceback, urllib
from twisted.internet import reactor
from light9.rdfdb.rdflibpatch import patchQuads
from light9.rdfdb.patch import Patch
log = logging.getLogger('syncedgraph')

class PatchReceiver(object):
    runs a web server in this process and registers it with the rdfdb
    master. See onPatch for what happens when the rdfdb master sends
    us a patch
    def __init__(self, label, graph, initiallySynced):
        self.graph = graph
        self.initiallySynced = initiallySynced
        listen = reactor.listenTCP(0, cyclone.web.Application(handlers=[
            (r'/update', makePatchEndpoint(self.onPatch)),
        port = listen._realPortNumber  # what's the right call for this?
        self.updateResource = 'http://localhost:%s/update' % port
"listening on %s" % port)

    def onPatch(self, p):
        central server has sent us a patch
        patchQuads(self.graph, p.delQuads, p.addQuads, perfect=True)
"graph now has %s statements" % len(self.graph))
        except Exception:
            # don't reflect this back to the server; we did
            # receive its patch correctly.

        if self.initiallySynced:
            self.initiallySynced = None

    def register(self, label):

        def done(x):
            log.debug("registered with rdfdb")

            headers={'Content-Type': ['application/x-www-form-urlencoded']},
            postdata=urllib.urlencode([('clientUpdate', self.updateResource),
                                       ('label', label)]),
            ).addCallbacks(done, log.error)
"registering with rdfdb")

def makePatchEndpointPutMethod(cb):
    def put(self):
            p = Patch(jsonRepr=self.request.body)
  "received patch -%d +%d" % (len(p.delGraph), len(p.addGraph)))
    return put

def makePatchEndpoint(cb):
    class Update(cyclone.web.RequestHandler):
        put = makePatchEndpointPutMethod(cb)
    return Update
new file 100644
new file 100644
import logging
import cyclone.httpclient
from twisted.internet import defer
log = logging.getLogger('syncedgraph')

def sendPatch(putUri, patch, **kw):
    kwargs will become extra attributes in the toplevel json object
    body = patch.makeJsonRepr(kw)
    log.debug("send body: %r", body)
    def ok(done):
        if not str(done.code).startswith('2'):
            raise ValueError("sendPatch request failed %s: %s" % (done.code, done.body))
        log.debug("sendPatch finished, response: %s" % done.body)
        return done

    return cyclone.httpclient.fetch(
        headers={'Content-Type': ['application/json']},

class PatchSender(object):
    SyncedGraph may generate patches faster than we can send
    them. This object buffers and may even collapse patches before
    they go the server
    def __init__(self, target, myUpdateResource):
 = target
        self.myUpdateResource = myUpdateResource
        self._patchesToSend = []
        self._currentSendPatchRequest = None

    def sendPatch(self, p):
        sendResult = defer.Deferred()
        self._patchesToSend.append((p, sendResult))
        return sendResult

    def _continueSending(self):
        if not self._patchesToSend or self._currentSendPatchRequest:
        if len(self._patchesToSend) > 1:
  "%s patches left to send", len(self._patchesToSend))
            # this is where we could concatenate little patches into a
            # bigger one. Often, many statements will cancel each
            # other out. not working yet:
            if 0:
                p = self._patchesToSend[0].concat(self._patchesToSend[1:])
                print "concat down to"
                print 'dels'
                for q in p.delQuads: print q
                print 'adds'
                for q in p.addQuads: print q
                print "----"
                p, sendResult = self._patchesToSend.pop(0)
            p, sendResult = self._patchesToSend.pop(0)

        self._currentSendPatchRequest = sendPatch(
  , p, senderUpdateUri=self.myUpdateResource)

    def _sendPatchDone(self, result):
        self._currentSendPatchRequest = None

    def _sendPatchErr(self, e):
        self._currentSendPatchRequest = None
        # we're probably out of sync with the master now, since
        # SyncedGraph.patch optimistically applied the patch to our
        # local graph already. What happens to this patch? What
        # happens to further pending patches? Some of the further
        # patches, especially, may be commutable with the bad one and
        # might still make sense to apply to the master graph.

        # if someday we are folding pending patches together, this
        # would be the time to UNDO that and attempt the original
        # separate patches again

        # this should screen for 409 conflict responses and raise a
        # special exception for that, so SyncedGraph.sendFailed can
        # screen for only that type

        # this code is going away; we're going to raise an exception that contains all the pending patches

new file 100644
from rdflib import ConjunctiveGraph, RDFS, RDF, URIRef
import logging, cyclone.httpclient, traceback, urllib, random
from itertools import chain
from twisted.internet import reactor, defer
client code uses a SyncedGraph, which has a few things:

AutoDepGraphApi - knockoutjs-inspired API for querying the graph in a
way that lets me call you again when there were changes to the things
you queried

CurrentStateGraphApi - a way to query the graph that doesn't gather
your dependencies like AutoDepGraphApi does

GraphEditApi - methods to write patches to the graph for common
operations, e.g. replacing a value, or editing a mapping

PatchReceiver - our web server that listens to edits from the master graph

PatchSender - collects and transmits your graph edits

from rdflib import ConjunctiveGraph
import logging, cyclone.httpclient
from twisted.internet import defer
log = logging.getLogger('syncedgraph')
from light9.rdfdb.patch import Patch
from light9.rdfdb.rdflibpatch import patchQuads, contextsForStatement as rp_contextsForStatement
from light9.rdfdb.rdflibpatch import patchQuads

from light9.rdfdb.patchsender import PatchSender
from light9.rdfdb.patchreceiver import PatchReceiver
from light9.rdfdb.currentstategraphapi import CurrentStateGraphApi
from light9.rdfdb.autodepgraphapi import AutoDepGraphApi
from light9.rdfdb.grapheditapi import GraphEditApi

# everybody who writes literals needs to get this
from rdflibpatch_literal import patch


class SyncedGraph(object):
class SyncedGraph(CurrentStateGraphApi, AutoDepGraphApi, GraphEditApi):
    graph for clients to use. Changes are synced with the master graph
    in the rdfdb process.

    This api is like rdflib.Graph but it can also call you back when
    there are graph changes to the parts you previously read.
@@ -196,42 +55,18 @@ class SyncedGraph(object):
        label is a string that the server will display in association
        with your connection
        self.initiallySynced = defer.Deferred()
        _graph = self._graph = ConjunctiveGraph()
        self._watchers = GraphWatchers()

        def onPatch(p):
            central server has sent us a patch
            patchQuads(_graph, p.delQuads, p.addQuads, perfect=True)
  "graph now has %s statements" % len(_graph))
            except Exception:
                # don't reflect this back to the server; we did
                # receive its patch correctly.
        self._receiver = PatchReceiver(label, _graph, self.initiallySynced)

            if self.initiallySynced:
                self.initiallySynced = None


        listen = reactor.listenTCP(0, cyclone.web.Application(handlers=[
            (r'/update', makePatchEndpoint(onPatch)),
        port = listen._realPortNumber  # what's the right call for this?
        self.updateResource = 'http://localhost:%s/update' % port
"listening on %s" % port)
        self.currentFuncs = [] # stack of addHandler callers
        self._sender = PatchSender('http://localhost:8051/patches',

    def resync(self):
        get the whole graph again from the server (e.g. we had a
        conflict while applying a patch and want to return to the
@@ -252,26 +87,12 @@ class SyncedGraph(object):

    def _resyncGraph(self, response):
        #diff against old entire graph
        #broadcast that change

    def register(self, label):

        def done(x):
            log.debug("registered with rdfdb")

            headers={'Content-Type': ['application/x-www-form-urlencoded']},
            postdata=urllib.urlencode([('clientUpdate', self.updateResource),
                                       ('label', label)]),
            ).addCallbacks(done, log.error)
"registering with rdfdb")

    def patch(self, p):
        """send this patch to the server and apply it to our local
        graph and run handlers"""

        # these could fail if we're out of sync. One approach:
        # Rerequest the full state from the server, try the patch
@@ -287,191 +108,6 @@ class SyncedGraph(object):
        that ultimately failed because of a conflict
        print "sendFailed"
        #i think we should receive back all the pending patches,
        #do a resysnc here,
        #then requeue all the pending patches (minus the failing one?) after that's done.


