diff --git a/bin/rdfdb b/bin/rdfdb --- a/bin/rdfdb +++ b/bin/rdfdb @@ -21,15 +21,11 @@ Global data undo should probably happen Maybe some subgraphs are for transient data (e.g. current timecode, mouse position in curvecalc) that only some listeners want to hear about. -Deletes aren't graph-specific, they affect all graphs at once, since -this seems much less confusing to the caller trying to delete a -statement. But, this may lead to weird things when two graphs have the -same statement, and then one deletes it. Or when deleting a stmt that -you see in file1 causes an edit to file2. This plan is making it hard -to invert a patch, so it's about to change. +Deletes are graph-specific, so callers may be surprised to delete a +stmt from one graph but then find that statement is still true. -Alternate plan for deletes: insist that every patch is only within one -subgraph, and just leave dup statements from other graphs alone. +Alternate plan: would it help to insist that every patch is within +only one subgraph? I think it's ok for them to span multiple ones. Inserts can be made on any subgraphs, and each subgraph is saved in its own file. The file might not be in a format that can express @@ -78,16 +74,17 @@ them here. """ from twisted.internet import reactor +import twisted.internet.error import sys, optparse, logging, json, os import cyclone.web, cyclone.httpclient, cyclone.websocket -from rdflib import URIRef sys.path.append(".") from light9 import networking, showconfig from rdflib import ConjunctiveGraph, URIRef, Graph -from light9 import rdfdb +from light9.rdfdb.graphfile import GraphFile +from light9.rdfdb.patch import Patch, ALLSTMTS +from light9.rdfdb import syncedgraph from twisted.internet.inotify import INotify -from twisted.python.filepath import FilePath logging.basicConfig(level=logging.DEBUG) log = logging.getLogger() @@ -100,46 +97,28 @@ except ImportError: pass class Client(object): - def __init__(self, updateUri, db): + """ + one of our syncedgraph clients + """ + def __init__(self, updateUri, label, db): self.db = db + self.label = label self.updateUri = updateUri self.sendAll() + def __repr__(self): + return "<%s client at %s>" % (self.label, self.updateUri) + def sendAll(self): """send the client the whole graph contents""" - log.info("sending all graphs to %s" % self.updateUri) - self.sendPatch(rdfdb.Patch( - addQuads=self.db.graph.quads(rdfdb.ALLSTMTS), - delTriples=[])) + log.info("sending all graphs to %s at %s" % + (self.label, self.updateUri)) + self.sendPatch(Patch( + addQuads=self.db.graph.quads(ALLSTMTS), + delQuads=[])) def sendPatch(self, p): - rdfdb.sendPatch(self.updateUri, p) - # err something if the client is gone, so it can be dropped - # from the list - -class GraphFile(object): - def __init__(self, notifier, path, uri, patch, getSubgraph): - self.path, self.uri = path, uri - self.patch, self.getSubgraph = patch, getSubgraph - - notifier.watch(FilePath(path), callbacks=[self.notify]) - self.reread() - - def notify(self, notifier, filepath, mask): - log.info("file %s changed" % filepath) - self.reread() - - def reread(self): - """update tha graph with any diffs from this file""" - old = self.getSubgraph(self.uri) - new = Graph() - new.parse(location=self.path, format='n3') - - adds = [(s,p,o,self.uri) for s,p,o in new-old] - dels = [(s,p,o) for s,p,o in old-new] - - if adds or dels: - self.patch(rdfdb.Patch(addQuads=adds, delTriples=dels)) + return syncedgraph.sendPatch(self.updateUri, p) class Db(object): def __init__(self): @@ -162,18 +141,28 @@ class Db(object): apply this patch to the master graph then notify everyone about it """ log.info("patching graph with %s adds %s dels" % - (len(p.addQuads), len(p.delTriples))) - for s in p.delTriples: - self.graph.remove(s) + (len(p.addQuads), len(p.delQuads))) + + + for spoc in p.delQuads: + # probably need to insist that these existed, or else cull + # the ones that didn't exist, to make the patch invert right + self.graph.get_context(spoc[3]).remove(spoc[:3]) addQuads = p.addQuads[:2] # test self.graph.addN(addQuads) self.summarizeToLog() for c in self.clients: - c.sendPatch(rdfdb.Patch(addQuads=addQuads, delTriples=p.delTriples)) + d = c.sendPatch(Patch(addQuads=addQuads, delQuads=p.delQuads)) + d.addErrback(self.clientErrored, c) sendToLiveClients(asJson=p.jsonRepr) + def clientErrored(self, err, c): + err.trap(twisted.internet.error.ConnectError) + log.info("connection error- dropping client %r" % c) + self.clients.remove(c) + def summarizeToLog(self): log.info("contexts in graph %s:" % len(self.graph)) for c in self.graph.contexts(): @@ -185,21 +174,22 @@ class Db(object): #return self.graph.get_context(uri) g = Graph() - for s in self.graph.triples(rdfdb.ALLSTMTS, uri): + for s in self.graph.triples(ALLSTMTS, uri): g.add(s) return g - def addClient(self, updateUri): + def addClient(self, updateUri, label): [self.clients.remove(c) for c in self.clients if c.updateUri == updateUri] - log.info("new client from %s" % updateUri) - self.clients.append(Client(updateUri, self)) + log.info("new client %s at %s" % (label, updateUri)) + self.clients.append(Client(updateUri, label, self)) self.sendClientsToAllLivePages() def sendClientsToAllLivePages(self): - sendToLiveClients({"clients":[c.updateUri for c in self.clients]}) - + sendToLiveClients({"clients":[ + dict(updateUri=c.updateUri, label=c.label) + for c in self.clients]}) class Index(PrettyErrorHandler, cyclone.web.RequestHandler): def get(self): @@ -213,7 +203,7 @@ class GraphResource(PrettyErrorHandler, class Patches(PrettyErrorHandler, cyclone.web.RequestHandler): def __init__(self, *args, **kw): cyclone.web.RequestHandler.__init__(self, *args, **kw) - p = rdfdb.makePatchEndpointPutMethod(self.settings.db.patch) + p = syncedgraph.makePatchEndpointPutMethod(self.settings.db.patch) self.put = lambda: p(self) def get(self): @@ -227,7 +217,7 @@ class GraphClients(PrettyErrorHandler, c def post(self): upd = self.get_argument("clientUpdate") try: - self.settings.db.addClient(upd) + self.settings.db.addClient(upd, self.get_argument("label")) except: import traceback traceback.print_exc()