diff --git a/bin/rdfdb b/bin/rdfdb new file mode 100644 --- /dev/null +++ b/bin/rdfdb @@ -0,0 +1,289 @@ +#!bin/python +""" +other tools POST themselves to here as subscribers to the graph. They +are providing a URL we can PUT to with graphs updates. + +we immediately PUT them back all the contents of the graph as a bunch +of adds. + +later we PUT them back with updates (add/del lists) when there are +changes. + +If we fail to reach a registered caller, we forget about it for future +calls. We can PUT empty diffs as a heartbeat to notice disappearing +callers faster. + +A caller can submit add/del changes that should be persisted and +broadcast. + +Global data undo should probably happen within this service. + +Maybe some subgraphs are for transient data (e.g. current timecode, +mouse position in curvecalc) that only some listeners want to hear about. + +Deletes aren't graph-specific, they affect all graphs at once, since +this seems much less confusing to the caller trying to delete a +statement. But, this may lead to weird things when two graphs have the +same statement, and then one deletes it. Or when deleting a stmt that +you see in file1 causes an edit to file2. This plan is making it hard +to invert a patch, so it's about to change. + +Alternate plan for deletes: insist that every patch is only within one +subgraph, and just leave dup statements from other graphs alone. + +Inserts can be made on any subgraphs, and each subgraph is saved in +its own file. The file might not be in a format that can express +graphs, so I'm just going to not store the subgraph URI in any file. + +I don't support wildcard deletes, and there are race conditions where a +s-p could end up with unexpected multiple objects. Every client needs +to be ready for this. + +We watch the files and push their own changes back to the clients. + +Persist our client list, to survive restarts. In another rdf file? A +random json one? memcache? Also hold the recent changes. We're not +logging everything forever, though, since the output files and a VCS +shall be used for that + +Bnodes: this rdfdb graph might be able to track bnodes correctly, and +they make for more compact n3 files. I'm not sure if it's going to be +hard to keep the client bnodes in sync though. File rereads would be +hard,if ever a bnode was used across graphs, so that probably should +not be allowed. + +Our API: + +GET / ui +GET /graph the whole graph (needed? just for ui browsing?) +PUT /patches clients submit changes +GET /patches (recent) patches from clients +POST /graphClients clientUpdate={uri} to subscribe +GET /graphClients current clients + +format: +json {"adds" : [[quads]...], + "deletes": [[quads]], + "from" : tooluri, + "created":tttt + } +maybe use some http://json-ld.org/ in there. + +Our web ui: + +registered clients + +recent edits, each one says what client it came from. You can reverse +them here. + +""" +from twisted.internet import reactor +import sys, optparse, logging, json, os +import cyclone.web, cyclone.httpclient, cyclone.websocket +from rdflib import URIRef +sys.path.append(".") +from light9 import networking, showconfig +from rdflib import ConjunctiveGraph, URIRef, Graph +from light9 import rdfdb + +from twisted.internet.inotify import INotify +from twisted.python.filepath import FilePath +logging.basicConfig(level=logging.DEBUG) +log = logging.getLogger() + +try: + import sys + sys.path.append("../homeauto/lib") + from cycloneerr import PrettyErrorHandler +except ImportError: + class PrettyErrorHandler(object): + pass + +class Client(object): + def __init__(self, updateUri, db): + self.db = db + self.updateUri = updateUri + self.sendAll() + + def sendAll(self): + """send the client the whole graph contents""" + log.info("sending all graphs to %s" % self.updateUri) + self.sendPatch(rdfdb.Patch( + addQuads=self.db.graph.quads(rdfdb.ALLSTMTS), + delTriples=[])) + + def sendPatch(self, p): + rdfdb.sendPatch(self.updateUri, p) + # err something if the client is gone, so it can be dropped + # from the list + +class GraphFile(object): + def __init__(self, notifier, path, uri, patch, getSubgraph): + self.path, self.uri = path, uri + self.patch, self.getSubgraph = patch, getSubgraph + + notifier.watch(FilePath(path), callbacks=[self.notify]) + self.reread() + + def notify(self, notifier, filepath, mask): + log.info("file %s changed" % filepath) + self.reread() + + def reread(self): + """update tha graph with any diffs from this file""" + old = self.getSubgraph(self.uri) + new = Graph() + new.parse(location=self.path, format='n3') + + adds = [(s,p,o,self.uri) for s,p,o in new-old] + dels = [(s,p,o) for s,p,o in old-new] + + if adds or dels: + self.patch(rdfdb.Patch(addQuads=adds, delTriples=dels)) + +class Db(object): + def __init__(self): + self.clients = [] + self.graph = ConjunctiveGraph() + + notifier = INotify() + notifier.startReading() + + for inFile in ["show/dance2012/config.n3", "demo.n3"]: + self.g = GraphFile(notifier, + inFile, + URIRef("http://example.com/%s" % + os.path.basename(inFile)), + self.patch, + self.getSubgraph) + + def patch(self, p): + """ + apply this patch to the master graph then notify everyone about it + """ + log.info("patching graph with %s adds %s dels" % + (len(p.addQuads), len(p.delTriples))) + for s in p.delTriples: + self.graph.remove(s) + + addQuads = p.addQuads[:2] # test + + self.graph.addN(addQuads) + self.summarizeToLog() + for c in self.clients: + c.sendPatch(rdfdb.Patch(addQuads=addQuads, delTriples=p.delTriples)) + sendToLiveClients(asJson=p.jsonRepr) + + def summarizeToLog(self): + log.info("contexts in graph %s:" % len(self.graph)) + for c in self.graph.contexts(): + log.info(" %s: %s statements" % + (c.identifier, len(self.getSubgraph(c.identifier)))) + + def getSubgraph(self, uri): + # this is returning an empty Graph :( + #return self.graph.get_context(uri) + + g = Graph() + for s in self.graph.triples(rdfdb.ALLSTMTS, uri): + g.add(s) + return g + + def addClient(self, updateUri): + [self.clients.remove(c) + for c in self.clients if c.updateUri == updateUri] + + log.info("new client from %s" % updateUri) + self.clients.append(Client(updateUri, self)) + self.sendClientsToAllLivePages() + + def sendClientsToAllLivePages(self): + sendToLiveClients({"clients":[c.updateUri for c in self.clients]}) + + +class Index(PrettyErrorHandler, cyclone.web.RequestHandler): + def get(self): + self.set_header("Content-Type", "application/xhtml+xml") + self.write(open("light9/rdfdb.xhtml").read()) + +class GraphResource(PrettyErrorHandler, cyclone.web.RequestHandler): + def get(self): + pass + +class Patches(PrettyErrorHandler, cyclone.web.RequestHandler): + def __init__(self, *args, **kw): + cyclone.web.RequestHandler.__init__(self, *args, **kw) + p = rdfdb.makePatchEndpointPutMethod(self.settings.db.patch) + self.put = lambda: p(self) + + def get(self): + pass + + +class GraphClients(PrettyErrorHandler, cyclone.web.RequestHandler): + def get(self): + pass + + def post(self): + upd = self.get_argument("clientUpdate") + try: + self.settings.db.addClient(upd) + except: + import traceback + traceback.print_exc() + raise + +liveClients = set() +def sendToLiveClients(d=None, asJson=None): + j = asJson or json.dumps(d) + for c in liveClients: + c.sendMessage(j) + +class Live(cyclone.websocket.WebSocketHandler): + + def connectionMade(self, *args, **kwargs): + log.info("ws opened") + liveClients.add(self) + self.settings.db.sendClientsToAllLivePages() + + def connectionLost(self, reason): + log.info("ws closed") + liveClients.remove(self) + + def messageReceived(self, message): + log.info("got message %s" % message) + self.sendMessage(message) + +if __name__ == "__main__": + logging.basicConfig() + log = logging.getLogger() + + parser = optparse.OptionParser() + parser.add_option('--show', + help='show URI, like http://light9.bigasterisk.com/show/dance2008', + default=showconfig.showUri()) + parser.add_option("-v", "--verbose", action="store_true", + help="logging.DEBUG") + (options, args) = parser.parse_args() + + log.setLevel(logging.DEBUG if options.verbose else logging.INFO) + + if not options.show: + raise ValueError("missing --show http://...") + + db = Db() + + port = 8051 + reactor.listenTCP(port, cyclone.web.Application(handlers=[ + (r'/', Index), + (r'/live', Live), + (r'/graph', GraphResource), + (r'/patches', Patches), + (r'/graphClients', GraphClients), + + (r"/(jquery-1\.7\.2\.min\.js)", cyclone.web.StaticFileHandler, + dict(path='lib')), + + ], db=db)) + log.info("serving on %s" % port) + reactor.run()