Mercurial > code > home > repos > light9
changeset 797:904913de4599
deletes are now quads. refactor files. named clients. auto client port
Ignore-this: 44f83643c28cbb0f961e2c8c1267d398
author | drewp@bigasterisk.com |
---|---|
date | Fri, 13 Jul 2012 19:25:03 +0000 |
parents | 37d05bd17b10 |
children | 5c158d37f1ce |
files | bin/clientdemo bin/rdfdb light9/rdfdb.py light9/rdfdb.xhtml light9/rdfdb/__init__.py light9/rdfdb/graphfile.py light9/rdfdb/patch.py light9/rdfdb/syncedgraph.py |
diffstat | 7 files changed, 313 insertions(+), 276 deletions(-) [+] |
line wrap: on
line diff
--- a/bin/clientdemo Fri Jul 13 18:25:34 2012 +0000 +++ b/bin/clientdemo Fri Jul 13 19:25:03 2012 +0000 @@ -5,14 +5,14 @@ from twisted.internet import reactor import cyclone.web, cyclone.httpclient, logging from rdflib import Namespace, Literal -from light9 import rdfdb +from light9.rdfdb.patch import Patch +from light9.rdfdb.syncedgraph import SyncedGraph if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) log = logging.getLogger() - port = 8052 - g = rdfdb.SyncedGraph(port) + g = SyncedGraph("clientdemo") L9 = Namespace("http://light9.bigasterisk.com/") def updateDemoValue(): @@ -22,8 +22,8 @@ g.addHandler(updateDemoValue) def adj(): - g.patch(rdfdb.Patch(addQuads=[(L9['demo'], L9['is'], Literal(os.getpid()), L9['clientdemo'])], - delTriples=[])) + g.patch(Patch(addQuads=[(L9['demo'], L9['is'], Literal(os.getpid()), + L9['clientdemo'])], + delQuads=[])) reactor.callLater(2, adj) - reactor.run()
--- a/bin/rdfdb Fri Jul 13 18:25:34 2012 +0000 +++ b/bin/rdfdb Fri Jul 13 19:25:03 2012 +0000 @@ -21,15 +21,11 @@ Maybe some subgraphs are for transient data (e.g. current timecode, mouse position in curvecalc) that only some listeners want to hear about. -Deletes aren't graph-specific, they affect all graphs at once, since -this seems much less confusing to the caller trying to delete a -statement. But, this may lead to weird things when two graphs have the -same statement, and then one deletes it. Or when deleting a stmt that -you see in file1 causes an edit to file2. This plan is making it hard -to invert a patch, so it's about to change. +Deletes are graph-specific, so callers may be surprised to delete a +stmt from one graph but then find that statement is still true. -Alternate plan for deletes: insist that every patch is only within one -subgraph, and just leave dup statements from other graphs alone. +Alternate plan: would it help to insist that every patch is within +only one subgraph? I think it's ok for them to span multiple ones. Inserts can be made on any subgraphs, and each subgraph is saved in its own file. The file might not be in a format that can express @@ -78,16 +74,17 @@ """ from twisted.internet import reactor +import twisted.internet.error import sys, optparse, logging, json, os import cyclone.web, cyclone.httpclient, cyclone.websocket -from rdflib import URIRef sys.path.append(".") from light9 import networking, showconfig from rdflib import ConjunctiveGraph, URIRef, Graph -from light9 import rdfdb +from light9.rdfdb.graphfile import GraphFile +from light9.rdfdb.patch import Patch, ALLSTMTS +from light9.rdfdb import syncedgraph from twisted.internet.inotify import INotify -from twisted.python.filepath import FilePath logging.basicConfig(level=logging.DEBUG) log = logging.getLogger() @@ -100,46 +97,28 @@ pass class Client(object): - def __init__(self, updateUri, db): + """ + one of our syncedgraph clients + """ + def __init__(self, updateUri, label, db): self.db = db + self.label = label self.updateUri = updateUri self.sendAll() + def __repr__(self): + return "<%s client at %s>" % (self.label, self.updateUri) + def sendAll(self): """send the client the whole graph contents""" - log.info("sending all graphs to %s" % self.updateUri) - self.sendPatch(rdfdb.Patch( - addQuads=self.db.graph.quads(rdfdb.ALLSTMTS), - delTriples=[])) + log.info("sending all graphs to %s at %s" % + (self.label, self.updateUri)) + self.sendPatch(Patch( + addQuads=self.db.graph.quads(ALLSTMTS), + delQuads=[])) def sendPatch(self, p): - rdfdb.sendPatch(self.updateUri, p) - # err something if the client is gone, so it can be dropped - # from the list - -class GraphFile(object): - def __init__(self, notifier, path, uri, patch, getSubgraph): - self.path, self.uri = path, uri - self.patch, self.getSubgraph = patch, getSubgraph - - notifier.watch(FilePath(path), callbacks=[self.notify]) - self.reread() - - def notify(self, notifier, filepath, mask): - log.info("file %s changed" % filepath) - self.reread() - - def reread(self): - """update tha graph with any diffs from this file""" - old = self.getSubgraph(self.uri) - new = Graph() - new.parse(location=self.path, format='n3') - - adds = [(s,p,o,self.uri) for s,p,o in new-old] - dels = [(s,p,o) for s,p,o in old-new] - - if adds or dels: - self.patch(rdfdb.Patch(addQuads=adds, delTriples=dels)) + return syncedgraph.sendPatch(self.updateUri, p) class Db(object): def __init__(self): @@ -162,18 +141,28 @@ apply this patch to the master graph then notify everyone about it """ log.info("patching graph with %s adds %s dels" % - (len(p.addQuads), len(p.delTriples))) - for s in p.delTriples: - self.graph.remove(s) + (len(p.addQuads), len(p.delQuads))) + + + for spoc in p.delQuads: + # probably need to insist that these existed, or else cull + # the ones that didn't exist, to make the patch invert right + self.graph.get_context(spoc[3]).remove(spoc[:3]) addQuads = p.addQuads[:2] # test self.graph.addN(addQuads) self.summarizeToLog() for c in self.clients: - c.sendPatch(rdfdb.Patch(addQuads=addQuads, delTriples=p.delTriples)) + d = c.sendPatch(Patch(addQuads=addQuads, delQuads=p.delQuads)) + d.addErrback(self.clientErrored, c) sendToLiveClients(asJson=p.jsonRepr) + def clientErrored(self, err, c): + err.trap(twisted.internet.error.ConnectError) + log.info("connection error- dropping client %r" % c) + self.clients.remove(c) + def summarizeToLog(self): log.info("contexts in graph %s:" % len(self.graph)) for c in self.graph.contexts(): @@ -185,21 +174,22 @@ #return self.graph.get_context(uri) g = Graph() - for s in self.graph.triples(rdfdb.ALLSTMTS, uri): + for s in self.graph.triples(ALLSTMTS, uri): g.add(s) return g - def addClient(self, updateUri): + def addClient(self, updateUri, label): [self.clients.remove(c) for c in self.clients if c.updateUri == updateUri] - log.info("new client from %s" % updateUri) - self.clients.append(Client(updateUri, self)) + log.info("new client %s at %s" % (label, updateUri)) + self.clients.append(Client(updateUri, label, self)) self.sendClientsToAllLivePages() def sendClientsToAllLivePages(self): - sendToLiveClients({"clients":[c.updateUri for c in self.clients]}) - + sendToLiveClients({"clients":[ + dict(updateUri=c.updateUri, label=c.label) + for c in self.clients]}) class Index(PrettyErrorHandler, cyclone.web.RequestHandler): def get(self): @@ -213,7 +203,7 @@ class Patches(PrettyErrorHandler, cyclone.web.RequestHandler): def __init__(self, *args, **kw): cyclone.web.RequestHandler.__init__(self, *args, **kw) - p = rdfdb.makePatchEndpointPutMethod(self.settings.db.patch) + p = syncedgraph.makePatchEndpointPutMethod(self.settings.db.patch) self.put = lambda: p(self) def get(self): @@ -227,7 +217,7 @@ def post(self): upd = self.get_argument("clientUpdate") try: - self.settings.db.addClient(upd) + self.settings.db.addClient(upd, self.get_argument("label")) except: import traceback traceback.print_exc()
--- a/light9/rdfdb.py Fri Jul 13 18:25:34 2012 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,214 +0,0 @@ -from rdflib import ConjunctiveGraph, Graph -import json, logging, cyclone.httpclient, traceback, urllib -from twisted.internet import reactor -log = logging.getLogger() - -ALLSTMTS = (None, None, None) - -class Patch(object): - """ - the json representation includes the {"patch":...} wrapper - """ - def __init__(self, jsonRepr=None, addQuads=None, delTriples=None, - addGraph=None, delGraph=None): - self._jsonRepr = jsonRepr - self._addQuads, self._delTriples = addQuads, delTriples - self._addGraph, self._delGraph = addGraph, delGraph - - if self._jsonRepr is not None: - body = json.loads(self._jsonRepr) - self._delGraph = Graph() - self._delGraph.parse(data=body['patch']['deletes'], format='nt') - self._addGraph = ConjunctiveGraph() - self._addGraph.parse(data=body['patch']['adds'], format='nquads') - - @property - def addQuads(self): - if self._addQuads is None: - if self._addGraph is not None: - self._addQuads = list(self._addGraph.quads(ALLSTMTS)) - else: - raise - return self._addQuads - - @property - def delTriples(self): - if self._delTriples is None: - if self._delGraph is not None: - self._delTriples = list(self._delGraph.triples(ALLSTMTS)) - else: - raise - return self._delTriples - - @property - def addGraph(self): - if self._addGraph is None: - raise - return self._addGraph - - @property - def delGraph(self): - if self._delGraph is None: - raise - return self._delGraph - - @property - def jsonRepr(self): - if self._jsonRepr is None: - addGraph = ConjunctiveGraph() - #addGraph.addN(addQuads) # no effect on nquad output - for s,p,o,c in self.addQuads: - addGraph.get_context(c).add((s,p,o)) - #addGraph.store.add((s,p,o), c) # no effect on nquad output - delGraph = Graph() - for s in self.delTriples: - delGraph.add(s) - self._jsonRepr = json.dumps({"patch": { - 'adds':addGraph.serialize(format='nquads'), - 'deletes':delGraph.serialize(format='nt'), - }}) - return self._jsonRepr - -def sendPatch(putUri, patch): - - # this will take args for sender, etc - body = patch.jsonRepr - log.debug("send body: %r" % body) - def ok(done): - if not str(done.code).startswith('2'): - raise ValueError("sendPatch request failed %s: %s" % (done.code, done.body)) - log.debug("sendPatch finished, response: %s" % done.body) - return done - - def err(e): - log.warn("sendPatch failed %r" % e) - raise e - - return cyclone.httpclient.fetch( - url=putUri, - method='PUT', - headers={'Content-Type': ['application/json']}, - postdata=body, - ).addCallbacks(ok, err) - -def makePatchEndpointPutMethod(cb): - def put(self): - try: - p = Patch(jsonRepr=self.request.body) - log.info("received patch -%d +%d" % (len(p.delGraph), len(p.addGraph))) - cb(p) - except: - traceback.print_exc() - raise - return put - -def makePatchEndpoint(cb): - class Update(cyclone.web.RequestHandler): - put = makePatchEndpointPutMethod(cb) - return Update - -class GraphWatchers(object): - def __init__(self): - self._handlersSp = {} # (s,p): set(handlers) - - def addSubjPredWatcher(self, func, s, p): - if func is None: - return - key = s, p - self._handlersSp.setdefault(key, set()).add(func) - - def whoCares(self, p): - """what functions would care about the changes in this patch""" - ret = set() - for s in self._handlersSp.values(): - ret.update(s) - return ret - -class SyncedGraph(object): - """ - api like rdflib.Graph which sends updates to rdfdb and can call - you back when there are graph changes - """ - def __init__(self, port): - _graph = self._graph = ConjunctiveGraph() - self._watchers = GraphWatchers() - - #then i try adding a statement that i will react to if i see it - #then i print updates to that statement as they come - #and the statement has a PID in it so we can see two clientdemos competing - #then factor out this client, and have real light9 tools start using it to build their graphs - #they just do full reload on relevant subgraphs at first, get progressively better - - def onPatch(p): - for s in p.delGraph: - _graph.remove(s) - _graph.addN(p.addGraph.quads(ALLSTMTS)) - log.info("graph now has %s statements" % len(_graph)) - self.updateOnPatch(p) - - reactor.listenTCP(port, cyclone.web.Application(handlers=[ - (r'/update', makePatchEndpoint(onPatch)), - ])) - self.updateResource = 'http://localhost:%s/update' % port - log.info("listening on %s" % port) - self.register() - - def updateOnPatch(self, p): - for func in self._watchers.whoCares(p): - self.addHandler(func) - - def register(self): - - def done(x): - print "registered", x.body - - cyclone.httpclient.fetch( - url='http://localhost:8051/graphClients', - method='POST', - headers={'Content-Type': ['application/x-www-form-urlencoded']}, - postdata=urllib.urlencode([('clientUpdate', self.updateResource)]), - ).addCallbacks(done, log.error) - log.info("registering with rdfdb") - - def patch(self, p): - """send this patch to the server and apply it to our local graph and run handlers""" - # currently this has to round-trip. But I could apply the - # patch here and have the server not bounce it back to me - return sendPatch('http://localhost:8051/patches', p) - - def addHandler(self, func): - """ - run this (idempotent) func, noting what graph values it - uses. Run it again in the future if there are changes to those - graph values. The func might use different values during that - future call, and those will be what we watch for next. - """ - - # if we saw this func before, we need to forget the old - # callbacks it wanted and replace with the new ones we see - # now. - - # if one handler func calls another, does that break anything? - # maybe not? - - # no plan for sparql queries yet. Hook into a lower layer that - # reveals all their statement fetches? Just make them always - # new? Cache their results, so if i make the query again and - # it gives the same result, I don't call the handler? - - self.currentFunc = func - try: - func() - finally: - self.currentFunc = None - - # these just call through to triples() so it might be possible to - # watch just that one - def value(self, subj, pred): - self._watchers.addSubjPredWatcher(self.currentFunc, subj, pred) - return self._graph.value(subj, pred) - - def objects(self, subject=None, predicate=None): - self._watchers.addSubjPredWatcher(self.currentFunc, subject, predicate) - return self._graph.objects(subject, predicate) -
--- a/light9/rdfdb.xhtml Fri Jul 13 18:25:34 2012 +0000 +++ b/light9/rdfdb.xhtml Fri Jul 13 19:25:03 2012 +0000 @@ -22,6 +22,14 @@ .patch .deletes { color: #DC6F6F; } + #out { + white-space: pre-wrap; + } + .patch fieldset { + color: gray; + font-family: arial; + font-size: 75%; + } /* ]]> */ </style> </head> @@ -64,7 +72,7 @@ ); } - $('#out').append($('<div>').text(JSON.stringify(evt.data))); + $('#out').append($('<div>').text(evt.data)); }; }); // ]]>
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/light9/rdfdb/graphfile.py Fri Jul 13 19:25:03 2012 +0000 @@ -0,0 +1,33 @@ +import logging +from twisted.python.filepath import FilePath +from rdflib import Graph +from light9.rdfdb.patch import Patch + +log = logging.getLogger() + +class GraphFile(object): + """ + one rdf file that we read from, write to, and notice external changes to + """ + def __init__(self, notifier, path, uri, patch, getSubgraph): + self.path, self.uri = path, uri + self.patch, self.getSubgraph = patch, getSubgraph + + notifier.watch(FilePath(path), callbacks=[self.notify]) + self.reread() + + def notify(self, notifier, filepath, mask): + log.info("file %s changed" % filepath) + self.reread() + + def reread(self): + """update the graph with any diffs from this file""" + old = self.getSubgraph(self.uri) + new = Graph() + new.parse(location=self.path, format='n3') + + adds = [(s, p, o, self.uri) for s, p, o in new - old] + dels = [(s, p, o, self.uri) for s, p, o in old - new] + + if adds or dels: + self.patch(Patch(addQuads=adds, delQuads=dels))
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/light9/rdfdb/patch.py Fri Jul 13 19:25:03 2012 +0000 @@ -0,0 +1,68 @@ +import json +from rdflib import ConjunctiveGraph + +ALLSTMTS = (None, None, None) + +def graphFromQuads(q): + g = ConjunctiveGraph() + #g.addN(q) # no effect on nquad output + for s,p,o,c in q: + g.get_context(c).add((s,p,o)) + #g.store.add((s,p,o), c) # no effect on nquad output + return g + +class Patch(object): + """ + the json representation includes the {"patch":...} wrapper + """ + def __init__(self, jsonRepr=None, addQuads=None, delQuads=None, + addGraph=None, delGraph=None): + self._jsonRepr = jsonRepr + self._addQuads, self._delQuads = addQuads, delQuads + self._addGraph, self._delGraph = addGraph, delGraph + + if self._jsonRepr is not None: + body = json.loads(self._jsonRepr) + self._delGraph = ConjunctiveGraph() + self._delGraph.parse(data=body['patch']['deletes'], format='nquads') + self._addGraph = ConjunctiveGraph() + self._addGraph.parse(data=body['patch']['adds'], format='nquads') + + @property + def addQuads(self): + if self._addQuads is None: + if self._addGraph is not None: + self._addQuads = list(self._addGraph.quads(ALLSTMTS)) + else: + raise + return self._addQuads + + @property + def delQuads(self): + if self._delQuads is None: + if self._delGraph is not None: + self._delQuads = list(self._delGraph.quads(ALLSTMTS)) + else: + raise + return self._delQuads + + @property + def addGraph(self): + if self._addGraph is None: + self._addGraph = graphFromQuads(self._addQuads) + return self._addGraph + + @property + def delGraph(self): + if self._delGraph is None: + self._delGraph = graphFromQuads(self._delQuads) + return self._delGraph + + @property + def jsonRepr(self): + if self._jsonRepr is None: + self._jsonRepr = json.dumps({"patch": { + 'adds':self.addGraph.serialize(format='nquads'), + 'deletes':self.delGraph.serialize(format='nquads'), + }}) + return self._jsonRepr
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/light9/rdfdb/syncedgraph.py Fri Jul 13 19:25:03 2012 +0000 @@ -0,0 +1,152 @@ +from rdflib import ConjunctiveGraph +import logging, cyclone.httpclient, traceback, urllib +from twisted.internet import reactor +log = logging.getLogger() +from light9.rdfdb.patch import Patch, ALLSTMTS + +def sendPatch(putUri, patch): + # this will take args for sender, etc + body = patch.jsonRepr + log.debug("send body: %r" % body) + def ok(done): + if not str(done.code).startswith('2'): + raise ValueError("sendPatch request failed %s: %s" % (done.code, done.body)) + log.debug("sendPatch finished, response: %s" % done.body) + return done + + return cyclone.httpclient.fetch( + url=putUri, + method='PUT', + headers={'Content-Type': ['application/json']}, + postdata=body, + ).addCallback(ok) + +def makePatchEndpointPutMethod(cb): + def put(self): + try: + p = Patch(jsonRepr=self.request.body) + log.info("received patch -%d +%d" % (len(p.delGraph), len(p.addGraph))) + cb(p) + except: + traceback.print_exc() + raise + return put + +def makePatchEndpoint(cb): + class Update(cyclone.web.RequestHandler): + put = makePatchEndpointPutMethod(cb) + return Update + +class GraphWatchers(object): + def __init__(self): + self._handlersSp = {} # (s,p): set(handlers) + + def addSubjPredWatcher(self, func, s, p): + if func is None: + return + key = s, p + self._handlersSp.setdefault(key, set()).add(func) + + def whoCares(self, p): + """what functions would care about the changes in this patch""" + ret = set() + for s in self._handlersSp.values(): + ret.update(s) + return ret + +class SyncedGraph(object): + """ + graph for clients to use. Changes are synced with the master graph + in the rdfdb process. + + This api is like rdflib.Graph but it can also call you back when + there are graph changes to the parts you previously read. + """ + def __init__(self, label): + """ + label is a string that the server will display in association + with your connection + """ + _graph = self._graph = ConjunctiveGraph() + self._watchers = GraphWatchers() + + def onPatch(p): + for spoc in p.delGraph.quads(ALLSTMTS): + _graph.get_context(spoc[3]).remove(spoc[:3]) + _graph.addN(p.addGraph.quads(ALLSTMTS)) + log.info("graph now has %s statements" % len(_graph)) + try: + self.updateOnPatch(p) + except Exception: + # don't reflect this back to the server; we did + # receive its patch correctly. + traceback.print_exc() + + listen = reactor.listenTCP(0, cyclone.web.Application(handlers=[ + (r'/update', makePatchEndpoint(onPatch)), + ])) + port = listen._realPortNumber # what's the right call for this? + self.updateResource = 'http://localhost:%s/update' % port + log.info("listening on %s" % port) + self.register(label) + + def updateOnPatch(self, p): + for func in self._watchers.whoCares(p): + self.addHandler(func) + + def register(self, label): + + def done(x): + print "registered", x.body + + cyclone.httpclient.fetch( + url='http://localhost:8051/graphClients', + method='POST', + headers={'Content-Type': ['application/x-www-form-urlencoded']}, + postdata=urllib.urlencode([('clientUpdate', self.updateResource), + ('label', label)]), + ).addCallbacks(done, log.error) + log.info("registering with rdfdb") + + def patch(self, p): + """send this patch to the server and apply it to our local graph and run handlers""" + # currently this has to round-trip. But I could apply the + # patch here and have the server not bounce it back to me + return sendPatch('http://localhost:8051/patches', p) + + def addHandler(self, func): + """ + run this (idempotent) func, noting what graph values it + uses. Run it again in the future if there are changes to those + graph values. The func might use different values during that + future call, and those will be what we watch for next. + """ + + # if we saw this func before, we need to forget the old + # callbacks it wanted and replace with the new ones we see + # now. + + # if one handler func calls another, does that break anything? + # maybe not? + + # no plan for sparql queries yet. Hook into a lower layer that + # reveals all their statement fetches? Just make them always + # new? Cache their results, so if i make the query again and + # it gives the same result, I don't call the handler? + + self.currentFunc = func + try: + func() + finally: + self.currentFunc = None + + # these just call through to triples() so it might be possible to + # watch just that one + def value(self, subj, pred): + self._watchers.addSubjPredWatcher(self.currentFunc, subj, pred) + return self._graph.value(subj, pred) + + def objects(self, subject=None, predicate=None): + self._watchers.addSubjPredWatcher(self.currentFunc, subject, predicate) + return self._graph.objects(subject, predicate) +