diff --git a/bin/clientdemo b/bin/clientdemo new file mode 100644 --- /dev/null +++ b/bin/clientdemo @@ -0,0 +1,29 @@ +#!bin/python + +import os, sys +sys.path.append(".") +from twisted.internet import reactor +import cyclone.web, cyclone.httpclient, logging +from rdflib import Namespace, Literal +from light9 import rdfdb + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + log = logging.getLogger() + + port = 8052 + g = rdfdb.SyncedGraph(port) + + L9 = Namespace("http://light9.bigasterisk.com/") + def updateDemoValue(): + v = list(g.objects(L9['demo'], L9['is'])) + print "demo value is %r" % v + + g.addHandler(updateDemoValue) + + def adj(): + g.patch(rdfdb.Patch(addQuads=[(L9['demo'], L9['is'], Literal(os.getpid()), L9['clientdemo'])], + delTriples=[])) + reactor.callLater(2, adj) + + reactor.run() diff --git a/bin/rdfdb b/bin/rdfdb new file mode 100644 --- /dev/null +++ b/bin/rdfdb @@ -0,0 +1,289 @@ +#!bin/python +""" +other tools POST themselves to here as subscribers to the graph. They +are providing a URL we can PUT to with graphs updates. + +we immediately PUT them back all the contents of the graph as a bunch +of adds. + +later we PUT them back with updates (add/del lists) when there are +changes. + +If we fail to reach a registered caller, we forget about it for future +calls. We can PUT empty diffs as a heartbeat to notice disappearing +callers faster. + +A caller can submit add/del changes that should be persisted and +broadcast. + +Global data undo should probably happen within this service. + +Maybe some subgraphs are for transient data (e.g. current timecode, +mouse position in curvecalc) that only some listeners want to hear about. + +Deletes aren't graph-specific, they affect all graphs at once, since +this seems much less confusing to the caller trying to delete a +statement. But, this may lead to weird things when two graphs have the +same statement, and then one deletes it. Or when deleting a stmt that +you see in file1 causes an edit to file2. This plan is making it hard +to invert a patch, so it's about to change. + +Alternate plan for deletes: insist that every patch is only within one +subgraph, and just leave dup statements from other graphs alone. + +Inserts can be made on any subgraphs, and each subgraph is saved in +its own file. The file might not be in a format that can express +graphs, so I'm just going to not store the subgraph URI in any file. + +I don't support wildcard deletes, and there are race conditions where a +s-p could end up with unexpected multiple objects. Every client needs +to be ready for this. + +We watch the files and push their own changes back to the clients. + +Persist our client list, to survive restarts. In another rdf file? A +random json one? memcache? Also hold the recent changes. We're not +logging everything forever, though, since the output files and a VCS +shall be used for that + +Bnodes: this rdfdb graph might be able to track bnodes correctly, and +they make for more compact n3 files. I'm not sure if it's going to be +hard to keep the client bnodes in sync though. File rereads would be +hard,if ever a bnode was used across graphs, so that probably should +not be allowed. + +Our API: + +GET / ui +GET /graph the whole graph (needed? just for ui browsing?) +PUT /patches clients submit changes +GET /patches (recent) patches from clients +POST /graphClients clientUpdate={uri} to subscribe +GET /graphClients current clients + +format: +json {"adds" : [[quads]...], + "deletes": [[quads]], + "from" : tooluri, + "created":tttt + } +maybe use some http://json-ld.org/ in there. + +Our web ui: + +registered clients + +recent edits, each one says what client it came from. You can reverse +them here. + +""" +from twisted.internet import reactor +import sys, optparse, logging, json, os +import cyclone.web, cyclone.httpclient, cyclone.websocket +from rdflib import URIRef +sys.path.append(".") +from light9 import networking, showconfig +from rdflib import ConjunctiveGraph, URIRef, Graph +from light9 import rdfdb + +from twisted.internet.inotify import INotify +from twisted.python.filepath import FilePath +logging.basicConfig(level=logging.DEBUG) +log = logging.getLogger() + +try: + import sys + sys.path.append("../homeauto/lib") + from cycloneerr import PrettyErrorHandler +except ImportError: + class PrettyErrorHandler(object): + pass + +class Client(object): + def __init__(self, updateUri, db): + self.db = db + self.updateUri = updateUri + self.sendAll() + + def sendAll(self): + """send the client the whole graph contents""" + log.info("sending all graphs to %s" % self.updateUri) + self.sendPatch(rdfdb.Patch( + addQuads=self.db.graph.quads(rdfdb.ALLSTMTS), + delTriples=[])) + + def sendPatch(self, p): + rdfdb.sendPatch(self.updateUri, p) + # err something if the client is gone, so it can be dropped + # from the list + +class GraphFile(object): + def __init__(self, notifier, path, uri, patch, getSubgraph): + self.path, self.uri = path, uri + self.patch, self.getSubgraph = patch, getSubgraph + + notifier.watch(FilePath(path), callbacks=[self.notify]) + self.reread() + + def notify(self, notifier, filepath, mask): + log.info("file %s changed" % filepath) + self.reread() + + def reread(self): + """update tha graph with any diffs from this file""" + old = self.getSubgraph(self.uri) + new = Graph() + new.parse(location=self.path, format='n3') + + adds = [(s,p,o,self.uri) for s,p,o in new-old] + dels = [(s,p,o) for s,p,o in old-new] + + if adds or dels: + self.patch(rdfdb.Patch(addQuads=adds, delTriples=dels)) + +class Db(object): + def __init__(self): + self.clients = [] + self.graph = ConjunctiveGraph() + + notifier = INotify() + notifier.startReading() + + for inFile in ["show/dance2012/config.n3", "demo.n3"]: + self.g = GraphFile(notifier, + inFile, + URIRef("http://example.com/%s" % + os.path.basename(inFile)), + self.patch, + self.getSubgraph) + + def patch(self, p): + """ + apply this patch to the master graph then notify everyone about it + """ + log.info("patching graph with %s adds %s dels" % + (len(p.addQuads), len(p.delTriples))) + for s in p.delTriples: + self.graph.remove(s) + + addQuads = p.addQuads[:2] # test + + self.graph.addN(addQuads) + self.summarizeToLog() + for c in self.clients: + c.sendPatch(rdfdb.Patch(addQuads=addQuads, delTriples=p.delTriples)) + sendToLiveClients(asJson=p.jsonRepr) + + def summarizeToLog(self): + log.info("contexts in graph %s:" % len(self.graph)) + for c in self.graph.contexts(): + log.info(" %s: %s statements" % + (c.identifier, len(self.getSubgraph(c.identifier)))) + + def getSubgraph(self, uri): + # this is returning an empty Graph :( + #return self.graph.get_context(uri) + + g = Graph() + for s in self.graph.triples(rdfdb.ALLSTMTS, uri): + g.add(s) + return g + + def addClient(self, updateUri): + [self.clients.remove(c) + for c in self.clients if c.updateUri == updateUri] + + log.info("new client from %s" % updateUri) + self.clients.append(Client(updateUri, self)) + self.sendClientsToAllLivePages() + + def sendClientsToAllLivePages(self): + sendToLiveClients({"clients":[c.updateUri for c in self.clients]}) + + +class Index(PrettyErrorHandler, cyclone.web.RequestHandler): + def get(self): + self.set_header("Content-Type", "application/xhtml+xml") + self.write(open("light9/rdfdb.xhtml").read()) + +class GraphResource(PrettyErrorHandler, cyclone.web.RequestHandler): + def get(self): + pass + +class Patches(PrettyErrorHandler, cyclone.web.RequestHandler): + def __init__(self, *args, **kw): + cyclone.web.RequestHandler.__init__(self, *args, **kw) + p = rdfdb.makePatchEndpointPutMethod(self.settings.db.patch) + self.put = lambda: p(self) + + def get(self): + pass + + +class GraphClients(PrettyErrorHandler, cyclone.web.RequestHandler): + def get(self): + pass + + def post(self): + upd = self.get_argument("clientUpdate") + try: + self.settings.db.addClient(upd) + except: + import traceback + traceback.print_exc() + raise + +liveClients = set() +def sendToLiveClients(d=None, asJson=None): + j = asJson or json.dumps(d) + for c in liveClients: + c.sendMessage(j) + +class Live(cyclone.websocket.WebSocketHandler): + + def connectionMade(self, *args, **kwargs): + log.info("ws opened") + liveClients.add(self) + self.settings.db.sendClientsToAllLivePages() + + def connectionLost(self, reason): + log.info("ws closed") + liveClients.remove(self) + + def messageReceived(self, message): + log.info("got message %s" % message) + self.sendMessage(message) + +if __name__ == "__main__": + logging.basicConfig() + log = logging.getLogger() + + parser = optparse.OptionParser() + parser.add_option('--show', + help='show URI, like http://light9.bigasterisk.com/show/dance2008', + default=showconfig.showUri()) + parser.add_option("-v", "--verbose", action="store_true", + help="logging.DEBUG") + (options, args) = parser.parse_args() + + log.setLevel(logging.DEBUG if options.verbose else logging.INFO) + + if not options.show: + raise ValueError("missing --show http://...") + + db = Db() + + port = 8051 + reactor.listenTCP(port, cyclone.web.Application(handlers=[ + (r'/', Index), + (r'/live', Live), + (r'/graph', GraphResource), + (r'/patches', Patches), + (r'/graphClients', GraphClients), + + (r"/(jquery-1\.7\.2\.min\.js)", cyclone.web.StaticFileHandler, + dict(path='lib')), + + ], db=db)) + log.info("serving on %s" % port) + reactor.run() diff --git a/light9/rdfdb.py b/light9/rdfdb.py new file mode 100644 --- /dev/null +++ b/light9/rdfdb.py @@ -0,0 +1,214 @@ +from rdflib import ConjunctiveGraph, Graph +import json, logging, cyclone.httpclient, traceback, urllib +from twisted.internet import reactor +log = logging.getLogger() + +ALLSTMTS = (None, None, None) + +class Patch(object): + """ + the json representation includes the {"patch":...} wrapper + """ + def __init__(self, jsonRepr=None, addQuads=None, delTriples=None, + addGraph=None, delGraph=None): + self._jsonRepr = jsonRepr + self._addQuads, self._delTriples = addQuads, delTriples + self._addGraph, self._delGraph = addGraph, delGraph + + if self._jsonRepr is not None: + body = json.loads(self._jsonRepr) + self._delGraph = Graph() + self._delGraph.parse(data=body['patch']['deletes'], format='nt') + self._addGraph = ConjunctiveGraph() + self._addGraph.parse(data=body['patch']['adds'], format='nquads') + + @property + def addQuads(self): + if self._addQuads is None: + if self._addGraph is not None: + self._addQuads = list(self._addGraph.quads(ALLSTMTS)) + else: + raise + return self._addQuads + + @property + def delTriples(self): + if self._delTriples is None: + if self._delGraph is not None: + self._delTriples = list(self._delGraph.triples(ALLSTMTS)) + else: + raise + return self._delTriples + + @property + def addGraph(self): + if self._addGraph is None: + raise + return self._addGraph + + @property + def delGraph(self): + if self._delGraph is None: + raise + return self._delGraph + + @property + def jsonRepr(self): + if self._jsonRepr is None: + addGraph = ConjunctiveGraph() + #addGraph.addN(addQuads) # no effect on nquad output + for s,p,o,c in self.addQuads: + addGraph.get_context(c).add((s,p,o)) + #addGraph.store.add((s,p,o), c) # no effect on nquad output + delGraph = Graph() + for s in self.delTriples: + delGraph.add(s) + self._jsonRepr = json.dumps({"patch": { + 'adds':addGraph.serialize(format='nquads'), + 'deletes':delGraph.serialize(format='nt'), + }}) + return self._jsonRepr + +def sendPatch(putUri, patch): + + # this will take args for sender, etc + body = patch.jsonRepr + log.debug("send body: %r" % body) + def ok(done): + if not str(done.code).startswith('2'): + raise ValueError("sendPatch request failed %s: %s" % (done.code, done.body)) + log.debug("sendPatch finished, response: %s" % done.body) + return done + + def err(e): + log.warn("sendPatch failed %r" % e) + raise e + + return cyclone.httpclient.fetch( + url=putUri, + method='PUT', + headers={'Content-Type': ['application/json']}, + postdata=body, + ).addCallbacks(ok, err) + +def makePatchEndpointPutMethod(cb): + def put(self): + try: + p = Patch(jsonRepr=self.request.body) + log.info("received patch -%d +%d" % (len(p.delGraph), len(p.addGraph))) + cb(p) + except: + traceback.print_exc() + raise + return put + +def makePatchEndpoint(cb): + class Update(cyclone.web.RequestHandler): + put = makePatchEndpointPutMethod(cb) + return Update + +class GraphWatchers(object): + def __init__(self): + self._handlersSp = {} # (s,p): set(handlers) + + def addSubjPredWatcher(self, func, s, p): + if func is None: + return + key = s, p + self._handlersSp.setdefault(key, set()).add(func) + + def whoCares(self, p): + """what functions would care about the changes in this patch""" + ret = set() + for s in self._handlersSp.values(): + ret.update(s) + return ret + +class SyncedGraph(object): + """ + api like rdflib.Graph which sends updates to rdfdb and can call + you back when there are graph changes + """ + def __init__(self, port): + _graph = self._graph = ConjunctiveGraph() + self._watchers = GraphWatchers() + + #then i try adding a statement that i will react to if i see it + #then i print updates to that statement as they come + #and the statement has a PID in it so we can see two clientdemos competing + #then factor out this client, and have real light9 tools start using it to build their graphs + #they just do full reload on relevant subgraphs at first, get progressively better + + def onPatch(p): + for s in p.delGraph: + _graph.remove(s) + _graph.addN(p.addGraph.quads(ALLSTMTS)) + log.info("graph now has %s statements" % len(_graph)) + self.updateOnPatch(p) + + reactor.listenTCP(port, cyclone.web.Application(handlers=[ + (r'/update', makePatchEndpoint(onPatch)), + ])) + self.updateResource = 'http://localhost:%s/update' % port + log.info("listening on %s" % port) + self.register() + + def updateOnPatch(self, p): + for func in self._watchers.whoCares(p): + self.addHandler(func) + + def register(self): + + def done(x): + print "registered", x.body + + cyclone.httpclient.fetch( + url='http://localhost:8051/graphClients', + method='POST', + headers={'Content-Type': ['application/x-www-form-urlencoded']}, + postdata=urllib.urlencode([('clientUpdate', self.updateResource)]), + ).addCallbacks(done, log.error) + log.info("registering with rdfdb") + + def patch(self, p): + """send this patch to the server and apply it to our local graph and run handlers""" + # currently this has to round-trip. But I could apply the + # patch here and have the server not bounce it back to me + return sendPatch('http://localhost:8051/patches', p) + + def addHandler(self, func): + """ + run this (idempotent) func, noting what graph values it + uses. Run it again in the future if there are changes to those + graph values. The func might use different values during that + future call, and those will be what we watch for next. + """ + + # if we saw this func before, we need to forget the old + # callbacks it wanted and replace with the new ones we see + # now. + + # if one handler func calls another, does that break anything? + # maybe not? + + # no plan for sparql queries yet. Hook into a lower layer that + # reveals all their statement fetches? Just make them always + # new? Cache their results, so if i make the query again and + # it gives the same result, I don't call the handler? + + self.currentFunc = func + try: + func() + finally: + self.currentFunc = None + + # these just call through to triples() so it might be possible to + # watch just that one + def value(self, subj, pred): + self._watchers.addSubjPredWatcher(self.currentFunc, subj, pred) + return self._graph.value(subj, pred) + + def objects(self, subject=None, predicate=None): + self._watchers.addSubjPredWatcher(self.currentFunc, subject, predicate) + return self._graph.objects(subject, predicate) + diff --git a/light9/rdfdb.xhtml b/light9/rdfdb.xhtml new file mode 100644 --- /dev/null +++ b/light9/rdfdb.xhtml @@ -0,0 +1,74 @@ + + + +
+status: starting...
+ +Clients:
+ + + + + + + + \ No newline at end of file