# HG changeset patch # User drewp@bigasterisk.com # Date 2012-07-13 19:25:03 # Node ID 904913de45990ebdf0752b8d1a752bb918cc1e77 # Parent 37d05bd17b108f1a5a6de87f142b2ac0c39a211c deletes are now quads. refactor files. named clients. auto client port Ignore-this: 44f83643c28cbb0f961e2c8c1267d398 diff --git a/bin/clientdemo b/bin/clientdemo --- a/bin/clientdemo +++ b/bin/clientdemo @@ -5,14 +5,14 @@ sys.path.append(".") from twisted.internet import reactor import cyclone.web, cyclone.httpclient, logging from rdflib import Namespace, Literal -from light9 import rdfdb +from light9.rdfdb.patch import Patch +from light9.rdfdb.syncedgraph import SyncedGraph if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) log = logging.getLogger() - port = 8052 - g = rdfdb.SyncedGraph(port) + g = SyncedGraph("clientdemo") L9 = Namespace("http://light9.bigasterisk.com/") def updateDemoValue(): @@ -22,8 +22,8 @@ if __name__ == "__main__": g.addHandler(updateDemoValue) def adj(): - g.patch(rdfdb.Patch(addQuads=[(L9['demo'], L9['is'], Literal(os.getpid()), L9['clientdemo'])], - delTriples=[])) + g.patch(Patch(addQuads=[(L9['demo'], L9['is'], Literal(os.getpid()), + L9['clientdemo'])], + delQuads=[])) reactor.callLater(2, adj) - reactor.run() diff --git a/bin/rdfdb b/bin/rdfdb --- a/bin/rdfdb +++ b/bin/rdfdb @@ -21,15 +21,11 @@ Global data undo should probably happen Maybe some subgraphs are for transient data (e.g. current timecode, mouse position in curvecalc) that only some listeners want to hear about. -Deletes aren't graph-specific, they affect all graphs at once, since -this seems much less confusing to the caller trying to delete a -statement. But, this may lead to weird things when two graphs have the -same statement, and then one deletes it. Or when deleting a stmt that -you see in file1 causes an edit to file2. This plan is making it hard -to invert a patch, so it's about to change. +Deletes are graph-specific, so callers may be surprised to delete a +stmt from one graph but then find that statement is still true. -Alternate plan for deletes: insist that every patch is only within one -subgraph, and just leave dup statements from other graphs alone. +Alternate plan: would it help to insist that every patch is within +only one subgraph? I think it's ok for them to span multiple ones. Inserts can be made on any subgraphs, and each subgraph is saved in its own file. The file might not be in a format that can express @@ -78,16 +74,17 @@ them here. """ from twisted.internet import reactor +import twisted.internet.error import sys, optparse, logging, json, os import cyclone.web, cyclone.httpclient, cyclone.websocket -from rdflib import URIRef sys.path.append(".") from light9 import networking, showconfig from rdflib import ConjunctiveGraph, URIRef, Graph -from light9 import rdfdb +from light9.rdfdb.graphfile import GraphFile +from light9.rdfdb.patch import Patch, ALLSTMTS +from light9.rdfdb import syncedgraph from twisted.internet.inotify import INotify -from twisted.python.filepath import FilePath logging.basicConfig(level=logging.DEBUG) log = logging.getLogger() @@ -100,46 +97,28 @@ except ImportError: pass class Client(object): - def __init__(self, updateUri, db): + """ + one of our syncedgraph clients + """ + def __init__(self, updateUri, label, db): self.db = db + self.label = label self.updateUri = updateUri self.sendAll() + def __repr__(self): + return "<%s client at %s>" % (self.label, self.updateUri) + def sendAll(self): """send the client the whole graph contents""" - log.info("sending all graphs to %s" % self.updateUri) - self.sendPatch(rdfdb.Patch( - addQuads=self.db.graph.quads(rdfdb.ALLSTMTS), - delTriples=[])) + log.info("sending all graphs to %s at %s" % + (self.label, self.updateUri)) + self.sendPatch(Patch( + addQuads=self.db.graph.quads(ALLSTMTS), + delQuads=[])) def sendPatch(self, p): - rdfdb.sendPatch(self.updateUri, p) - # err something if the client is gone, so it can be dropped - # from the list - -class GraphFile(object): - def __init__(self, notifier, path, uri, patch, getSubgraph): - self.path, self.uri = path, uri - self.patch, self.getSubgraph = patch, getSubgraph - - notifier.watch(FilePath(path), callbacks=[self.notify]) - self.reread() - - def notify(self, notifier, filepath, mask): - log.info("file %s changed" % filepath) - self.reread() - - def reread(self): - """update tha graph with any diffs from this file""" - old = self.getSubgraph(self.uri) - new = Graph() - new.parse(location=self.path, format='n3') - - adds = [(s,p,o,self.uri) for s,p,o in new-old] - dels = [(s,p,o) for s,p,o in old-new] - - if adds or dels: - self.patch(rdfdb.Patch(addQuads=adds, delTriples=dels)) + return syncedgraph.sendPatch(self.updateUri, p) class Db(object): def __init__(self): @@ -162,18 +141,28 @@ class Db(object): apply this patch to the master graph then notify everyone about it """ log.info("patching graph with %s adds %s dels" % - (len(p.addQuads), len(p.delTriples))) - for s in p.delTriples: - self.graph.remove(s) + (len(p.addQuads), len(p.delQuads))) + + + for spoc in p.delQuads: + # probably need to insist that these existed, or else cull + # the ones that didn't exist, to make the patch invert right + self.graph.get_context(spoc[3]).remove(spoc[:3]) addQuads = p.addQuads[:2] # test self.graph.addN(addQuads) self.summarizeToLog() for c in self.clients: - c.sendPatch(rdfdb.Patch(addQuads=addQuads, delTriples=p.delTriples)) + d = c.sendPatch(Patch(addQuads=addQuads, delQuads=p.delQuads)) + d.addErrback(self.clientErrored, c) sendToLiveClients(asJson=p.jsonRepr) + def clientErrored(self, err, c): + err.trap(twisted.internet.error.ConnectError) + log.info("connection error- dropping client %r" % c) + self.clients.remove(c) + def summarizeToLog(self): log.info("contexts in graph %s:" % len(self.graph)) for c in self.graph.contexts(): @@ -185,21 +174,22 @@ class Db(object): #return self.graph.get_context(uri) g = Graph() - for s in self.graph.triples(rdfdb.ALLSTMTS, uri): + for s in self.graph.triples(ALLSTMTS, uri): g.add(s) return g - def addClient(self, updateUri): + def addClient(self, updateUri, label): [self.clients.remove(c) for c in self.clients if c.updateUri == updateUri] - log.info("new client from %s" % updateUri) - self.clients.append(Client(updateUri, self)) + log.info("new client %s at %s" % (label, updateUri)) + self.clients.append(Client(updateUri, label, self)) self.sendClientsToAllLivePages() def sendClientsToAllLivePages(self): - sendToLiveClients({"clients":[c.updateUri for c in self.clients]}) - + sendToLiveClients({"clients":[ + dict(updateUri=c.updateUri, label=c.label) + for c in self.clients]}) class Index(PrettyErrorHandler, cyclone.web.RequestHandler): def get(self): @@ -213,7 +203,7 @@ class GraphResource(PrettyErrorHandler, class Patches(PrettyErrorHandler, cyclone.web.RequestHandler): def __init__(self, *args, **kw): cyclone.web.RequestHandler.__init__(self, *args, **kw) - p = rdfdb.makePatchEndpointPutMethod(self.settings.db.patch) + p = syncedgraph.makePatchEndpointPutMethod(self.settings.db.patch) self.put = lambda: p(self) def get(self): @@ -227,7 +217,7 @@ class GraphClients(PrettyErrorHandler, c def post(self): upd = self.get_argument("clientUpdate") try: - self.settings.db.addClient(upd) + self.settings.db.addClient(upd, self.get_argument("label")) except: import traceback traceback.print_exc() diff --git a/light9/rdfdb.xhtml b/light9/rdfdb.xhtml --- a/light9/rdfdb.xhtml +++ b/light9/rdfdb.xhtml @@ -22,6 +22,14 @@ .patch .deletes { color: #DC6F6F; } + #out { + white-space: pre-wrap; + } + .patch fieldset { + color: gray; + font-family: arial; + font-size: 75%; + } /* ]]> */ @@ -64,7 +72,7 @@ ); } - $('#out').append($('
').text(JSON.stringify(evt.data))); + $('#out').append($('
').text(evt.data)); }; }); // ]]> diff --git a/light9/rdfdb/__init__.py b/light9/rdfdb/__init__.py new file mode 100644 diff --git a/light9/rdfdb/graphfile.py b/light9/rdfdb/graphfile.py new file mode 100644 --- /dev/null +++ b/light9/rdfdb/graphfile.py @@ -0,0 +1,33 @@ +import logging +from twisted.python.filepath import FilePath +from rdflib import Graph +from light9.rdfdb.patch import Patch + +log = logging.getLogger() + +class GraphFile(object): + """ + one rdf file that we read from, write to, and notice external changes to + """ + def __init__(self, notifier, path, uri, patch, getSubgraph): + self.path, self.uri = path, uri + self.patch, self.getSubgraph = patch, getSubgraph + + notifier.watch(FilePath(path), callbacks=[self.notify]) + self.reread() + + def notify(self, notifier, filepath, mask): + log.info("file %s changed" % filepath) + self.reread() + + def reread(self): + """update the graph with any diffs from this file""" + old = self.getSubgraph(self.uri) + new = Graph() + new.parse(location=self.path, format='n3') + + adds = [(s, p, o, self.uri) for s, p, o in new - old] + dels = [(s, p, o, self.uri) for s, p, o in old - new] + + if adds or dels: + self.patch(Patch(addQuads=adds, delQuads=dels)) diff --git a/light9/rdfdb/patch.py b/light9/rdfdb/patch.py new file mode 100644 --- /dev/null +++ b/light9/rdfdb/patch.py @@ -0,0 +1,68 @@ +import json +from rdflib import ConjunctiveGraph + +ALLSTMTS = (None, None, None) + +def graphFromQuads(q): + g = ConjunctiveGraph() + #g.addN(q) # no effect on nquad output + for s,p,o,c in q: + g.get_context(c).add((s,p,o)) + #g.store.add((s,p,o), c) # no effect on nquad output + return g + +class Patch(object): + """ + the json representation includes the {"patch":...} wrapper + """ + def __init__(self, jsonRepr=None, addQuads=None, delQuads=None, + addGraph=None, delGraph=None): + self._jsonRepr = jsonRepr + self._addQuads, self._delQuads = addQuads, delQuads + self._addGraph, self._delGraph = addGraph, delGraph + + if self._jsonRepr is not None: + body = json.loads(self._jsonRepr) + self._delGraph = ConjunctiveGraph() + self._delGraph.parse(data=body['patch']['deletes'], format='nquads') + self._addGraph = ConjunctiveGraph() + self._addGraph.parse(data=body['patch']['adds'], format='nquads') + + @property + def addQuads(self): + if self._addQuads is None: + if self._addGraph is not None: + self._addQuads = list(self._addGraph.quads(ALLSTMTS)) + else: + raise + return self._addQuads + + @property + def delQuads(self): + if self._delQuads is None: + if self._delGraph is not None: + self._delQuads = list(self._delGraph.quads(ALLSTMTS)) + else: + raise + return self._delQuads + + @property + def addGraph(self): + if self._addGraph is None: + self._addGraph = graphFromQuads(self._addQuads) + return self._addGraph + + @property + def delGraph(self): + if self._delGraph is None: + self._delGraph = graphFromQuads(self._delQuads) + return self._delGraph + + @property + def jsonRepr(self): + if self._jsonRepr is None: + self._jsonRepr = json.dumps({"patch": { + 'adds':self.addGraph.serialize(format='nquads'), + 'deletes':self.delGraph.serialize(format='nquads'), + }}) + return self._jsonRepr diff --git a/light9/rdfdb.py b/light9/rdfdb/syncedgraph.py rename from light9/rdfdb.py rename to light9/rdfdb/syncedgraph.py --- a/light9/rdfdb.py +++ b/light9/rdfdb/syncedgraph.py @@ -1,76 +1,10 @@ -from rdflib import ConjunctiveGraph, Graph -import json, logging, cyclone.httpclient, traceback, urllib +from rdflib import ConjunctiveGraph +import logging, cyclone.httpclient, traceback, urllib from twisted.internet import reactor log = logging.getLogger() - -ALLSTMTS = (None, None, None) - -class Patch(object): - """ - the json representation includes the {"patch":...} wrapper - """ - def __init__(self, jsonRepr=None, addQuads=None, delTriples=None, - addGraph=None, delGraph=None): - self._jsonRepr = jsonRepr - self._addQuads, self._delTriples = addQuads, delTriples - self._addGraph, self._delGraph = addGraph, delGraph - - if self._jsonRepr is not None: - body = json.loads(self._jsonRepr) - self._delGraph = Graph() - self._delGraph.parse(data=body['patch']['deletes'], format='nt') - self._addGraph = ConjunctiveGraph() - self._addGraph.parse(data=body['patch']['adds'], format='nquads') - - @property - def addQuads(self): - if self._addQuads is None: - if self._addGraph is not None: - self._addQuads = list(self._addGraph.quads(ALLSTMTS)) - else: - raise - return self._addQuads - - @property - def delTriples(self): - if self._delTriples is None: - if self._delGraph is not None: - self._delTriples = list(self._delGraph.triples(ALLSTMTS)) - else: - raise - return self._delTriples - - @property - def addGraph(self): - if self._addGraph is None: - raise - return self._addGraph - - @property - def delGraph(self): - if self._delGraph is None: - raise - return self._delGraph - - @property - def jsonRepr(self): - if self._jsonRepr is None: - addGraph = ConjunctiveGraph() - #addGraph.addN(addQuads) # no effect on nquad output - for s,p,o,c in self.addQuads: - addGraph.get_context(c).add((s,p,o)) - #addGraph.store.add((s,p,o), c) # no effect on nquad output - delGraph = Graph() - for s in self.delTriples: - delGraph.add(s) - self._jsonRepr = json.dumps({"patch": { - 'adds':addGraph.serialize(format='nquads'), - 'deletes':delGraph.serialize(format='nt'), - }}) - return self._jsonRepr +from light9.rdfdb.patch import Patch, ALLSTMTS def sendPatch(putUri, patch): - # this will take args for sender, etc body = patch.jsonRepr log.debug("send body: %r" % body) @@ -80,16 +14,12 @@ def sendPatch(putUri, patch): log.debug("sendPatch finished, response: %s" % done.body) return done - def err(e): - log.warn("sendPatch failed %r" % e) - raise e - return cyclone.httpclient.fetch( url=putUri, method='PUT', headers={'Content-Type': ['application/json']}, postdata=body, - ).addCallbacks(ok, err) + ).addCallback(ok) def makePatchEndpointPutMethod(cb): def put(self): @@ -126,38 +56,45 @@ class GraphWatchers(object): class SyncedGraph(object): """ - api like rdflib.Graph which sends updates to rdfdb and can call - you back when there are graph changes + graph for clients to use. Changes are synced with the master graph + in the rdfdb process. + + This api is like rdflib.Graph but it can also call you back when + there are graph changes to the parts you previously read. """ - def __init__(self, port): + def __init__(self, label): + """ + label is a string that the server will display in association + with your connection + """ _graph = self._graph = ConjunctiveGraph() self._watchers = GraphWatchers() - #then i try adding a statement that i will react to if i see it - #then i print updates to that statement as they come - #and the statement has a PID in it so we can see two clientdemos competing - #then factor out this client, and have real light9 tools start using it to build their graphs - #they just do full reload on relevant subgraphs at first, get progressively better - def onPatch(p): - for s in p.delGraph: - _graph.remove(s) + for spoc in p.delGraph.quads(ALLSTMTS): + _graph.get_context(spoc[3]).remove(spoc[:3]) _graph.addN(p.addGraph.quads(ALLSTMTS)) log.info("graph now has %s statements" % len(_graph)) - self.updateOnPatch(p) + try: + self.updateOnPatch(p) + except Exception: + # don't reflect this back to the server; we did + # receive its patch correctly. + traceback.print_exc() - reactor.listenTCP(port, cyclone.web.Application(handlers=[ + listen = reactor.listenTCP(0, cyclone.web.Application(handlers=[ (r'/update', makePatchEndpoint(onPatch)), ])) + port = listen._realPortNumber # what's the right call for this? self.updateResource = 'http://localhost:%s/update' % port log.info("listening on %s" % port) - self.register() + self.register(label) def updateOnPatch(self, p): for func in self._watchers.whoCares(p): self.addHandler(func) - def register(self): + def register(self, label): def done(x): print "registered", x.body @@ -166,7 +103,8 @@ class SyncedGraph(object): url='http://localhost:8051/graphClients', method='POST', headers={'Content-Type': ['application/x-www-form-urlencoded']}, - postdata=urllib.urlencode([('clientUpdate', self.updateResource)]), + postdata=urllib.urlencode([('clientUpdate', self.updateResource), + ('label', label)]), ).addCallbacks(done, log.error) log.info("registering with rdfdb")