changeset 796:37d05bd17b10

rdfdb first pass Ignore-this: 8d4935412412160aa53ccc0ab3e46d0e
author drewp@bigasterisk.com
date Fri, 13 Jul 2012 18:25:34 +0000
parents 09026c837ceb
children 904913de4599
files bin/clientdemo bin/rdfdb light9/rdfdb.py light9/rdfdb.xhtml
diffstat 4 files changed, 606 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/bin/clientdemo	Fri Jul 13 18:25:34 2012 +0000
@@ -0,0 +1,29 @@
+#!bin/python
+
+import os, sys
+sys.path.append(".")
+from twisted.internet import reactor
+import cyclone.web, cyclone.httpclient, logging
+from rdflib import Namespace, Literal
+from light9 import rdfdb
+
+if __name__ == "__main__":
+    logging.basicConfig(level=logging.DEBUG)
+    log = logging.getLogger()
+
+    port = 8052
+    g = rdfdb.SyncedGraph(port)
+
+    L9 = Namespace("http://light9.bigasterisk.com/")
+    def updateDemoValue():
+        v = list(g.objects(L9['demo'], L9['is']))
+        print "demo value is %r" % v
+
+    g.addHandler(updateDemoValue)
+
+    def adj():
+        g.patch(rdfdb.Patch(addQuads=[(L9['demo'], L9['is'], Literal(os.getpid()), L9['clientdemo'])],
+                delTriples=[]))
+    reactor.callLater(2, adj)
+
+    reactor.run()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/bin/rdfdb	Fri Jul 13 18:25:34 2012 +0000
@@ -0,0 +1,289 @@
+#!bin/python
+"""
+other tools POST themselves to here as subscribers to the graph. They
+are providing a URL we can PUT to with graphs updates.
+
+we immediately PUT them back all the contents of the graph as a bunch
+of adds.
+
+later we PUT them back with updates (add/del lists) when there are
+changes.
+
+If we fail to reach a registered caller, we forget about it for future
+calls. We can PUT empty diffs as a heartbeat to notice disappearing
+callers faster.
+
+A caller can submit add/del changes that should be persisted and
+broadcast.
+
+Global data undo should probably happen within this service.
+
+Maybe some subgraphs are for transient data (e.g. current timecode,
+mouse position in curvecalc) that only some listeners want to hear about.
+
+Deletes aren't graph-specific, they affect all graphs at once, since
+this seems much less confusing to the caller trying to delete a
+statement. But, this may lead to weird things when two graphs have the
+same statement, and then one deletes it. Or when deleting a stmt that
+you see in file1 causes an edit to file2. This plan is making it hard
+to invert a patch, so it's about to change.
+
+Alternate plan for deletes: insist that every patch is only within one
+subgraph, and just leave dup statements from other graphs alone.
+
+Inserts can be made on any subgraphs, and each subgraph is saved in
+its own file. The file might not be in a format that can express
+graphs, so I'm just going to not store the subgraph URI in any file.
+
+I don't support wildcard deletes, and there are race conditions where a
+s-p could end up with unexpected multiple objects. Every client needs
+to be ready for this.
+
+We watch the files and push their own changes back to the clients.
+
+Persist our client list, to survive restarts. In another rdf file? A
+random json one? memcache? Also hold the recent changes. We're not
+logging everything forever, though, since the output files and a VCS
+shall be used for that
+
+Bnodes: this rdfdb graph might be able to track bnodes correctly, and
+they make for more compact n3 files. I'm not sure if it's going to be
+hard to keep the client bnodes in sync though. File rereads would be
+hard,if ever a bnode was used across graphs, so that probably should
+not be allowed.
+
+Our API:
+
+GET /  ui
+GET /graph    the whole graph (needed? just for ui browsing?)
+PUT /patches  clients submit changes
+GET /patches  (recent) patches from clients
+POST /graphClients clientUpdate={uri} to subscribe
+GET /graphClients  current clients
+
+format:
+json {"adds" : [[quads]...],
+      "deletes": [[quads]],
+      "from" : tooluri,
+      "created":tttt
+     }
+maybe use some http://json-ld.org/ in there.
+
+Our web ui:
+
+registered clients
+
+recent edits, each one says what client it came from. You can reverse
+them here.
+
+"""
+from twisted.internet import reactor
+import sys, optparse, logging, json, os
+import cyclone.web, cyclone.httpclient, cyclone.websocket
+from rdflib import URIRef
+sys.path.append(".")
+from light9 import networking, showconfig
+from rdflib import ConjunctiveGraph, URIRef, Graph
+from light9 import rdfdb 
+
+from twisted.internet.inotify import INotify
+from twisted.python.filepath import FilePath
+logging.basicConfig(level=logging.DEBUG)
+log = logging.getLogger()
+
+try:
+    import sys
+    sys.path.append("../homeauto/lib")
+    from cycloneerr import PrettyErrorHandler
+except ImportError:
+    class PrettyErrorHandler(object):
+        pass
+
+class Client(object):
+    def __init__(self, updateUri, db):
+        self.db = db
+        self.updateUri = updateUri
+        self.sendAll()
+
+    def sendAll(self):
+        """send the client the whole graph contents"""
+        log.info("sending all graphs to %s" % self.updateUri)
+        self.sendPatch(rdfdb.Patch(
+            addQuads=self.db.graph.quads(rdfdb.ALLSTMTS),
+            delTriples=[]))
+        
+    def sendPatch(self, p):
+        rdfdb.sendPatch(self.updateUri, p)
+        # err something if the client is gone, so it can be dropped
+        # from the list
+
+class GraphFile(object):
+    def __init__(self, notifier, path, uri, patch, getSubgraph):
+        self.path, self.uri = path, uri
+        self.patch, self.getSubgraph = patch, getSubgraph
+
+        notifier.watch(FilePath(path), callbacks=[self.notify])
+        self.reread()
+      
+    def notify(self, notifier, filepath, mask):
+        log.info("file %s changed" % filepath)
+        self.reread()
+
+    def reread(self):
+        """update tha graph with any diffs from this file"""
+        old = self.getSubgraph(self.uri)
+        new = Graph()
+        new.parse(location=self.path, format='n3')
+
+        adds = [(s,p,o,self.uri) for s,p,o in new-old]
+        dels = [(s,p,o) for s,p,o in old-new]
+
+        if adds or dels:
+            self.patch(rdfdb.Patch(addQuads=adds, delTriples=dels))
+
+class Db(object):
+    def __init__(self):
+        self.clients = []
+        self.graph = ConjunctiveGraph()
+
+        notifier = INotify()
+        notifier.startReading()
+
+        for inFile in ["show/dance2012/config.n3", "demo.n3"]:
+            self.g = GraphFile(notifier,
+                               inFile,
+                               URIRef("http://example.com/%s" %
+                                      os.path.basename(inFile)),
+                               self.patch,
+                               self.getSubgraph)
+
+    def patch(self, p):
+        """
+        apply this patch to the master graph then notify everyone about it
+        """
+        log.info("patching graph with %s adds %s dels" %
+                 (len(p.addQuads), len(p.delTriples)))
+        for s in p.delTriples:
+            self.graph.remove(s)
+
+        addQuads = p.addQuads[:2] # test
+
+        self.graph.addN(addQuads)
+        self.summarizeToLog()
+        for c in self.clients:
+            c.sendPatch(rdfdb.Patch(addQuads=addQuads, delTriples=p.delTriples))
+        sendToLiveClients(asJson=p.jsonRepr)
+
+    def summarizeToLog(self):
+        log.info("contexts in graph %s:" % len(self.graph))
+        for c in self.graph.contexts():
+            log.info("  %s: %s statements" %
+                     (c.identifier, len(self.getSubgraph(c.identifier))))
+
+    def getSubgraph(self, uri):
+        # this is returning an empty Graph :(
+        #return self.graph.get_context(uri)
+
+        g = Graph()
+        for s in self.graph.triples(rdfdb.ALLSTMTS, uri):
+            g.add(s)
+        return g
+    
+    def addClient(self, updateUri):
+        [self.clients.remove(c)
+         for c in self.clients if c.updateUri == updateUri]
+
+        log.info("new client from %s" % updateUri)
+        self.clients.append(Client(updateUri, self))
+        self.sendClientsToAllLivePages()
+
+    def sendClientsToAllLivePages(self):
+        sendToLiveClients({"clients":[c.updateUri for c in self.clients]})
+        
+
+class Index(PrettyErrorHandler, cyclone.web.RequestHandler):
+    def get(self):
+        self.set_header("Content-Type", "application/xhtml+xml")
+        self.write(open("light9/rdfdb.xhtml").read())
+
+class GraphResource(PrettyErrorHandler, cyclone.web.RequestHandler):
+    def get(self):
+        pass
+    
+class Patches(PrettyErrorHandler, cyclone.web.RequestHandler):
+    def __init__(self, *args, **kw):
+        cyclone.web.RequestHandler.__init__(self, *args, **kw)
+        p = rdfdb.makePatchEndpointPutMethod(self.settings.db.patch)
+        self.put = lambda: p(self)
+
+    def get(self):
+        pass
+
+
+class GraphClients(PrettyErrorHandler, cyclone.web.RequestHandler):
+    def get(self):
+        pass
+    
+    def post(self):
+        upd = self.get_argument("clientUpdate")
+        try:
+            self.settings.db.addClient(upd)
+        except:
+            import traceback
+            traceback.print_exc()
+            raise
+
+liveClients = set()
+def sendToLiveClients(d=None, asJson=None):
+    j = asJson or json.dumps(d)
+    for c in liveClients:
+        c.sendMessage(j)
+
+class Live(cyclone.websocket.WebSocketHandler):
+    
+    def connectionMade(self, *args, **kwargs):
+        log.info("ws opened")
+        liveClients.add(self)
+        self.settings.db.sendClientsToAllLivePages()
+
+    def connectionLost(self, reason):
+        log.info("ws closed")
+        liveClients.remove(self)
+
+    def messageReceived(self, message):
+        log.info("got message %s" % message)
+        self.sendMessage(message)
+
+if __name__ == "__main__":
+    logging.basicConfig()
+    log = logging.getLogger()
+
+    parser = optparse.OptionParser()
+    parser.add_option('--show',
+        help='show URI, like http://light9.bigasterisk.com/show/dance2008',
+                      default=showconfig.showUri())
+    parser.add_option("-v", "--verbose", action="store_true",
+                      help="logging.DEBUG")
+    (options, args) = parser.parse_args()
+
+    log.setLevel(logging.DEBUG if options.verbose else logging.INFO)
+
+    if not options.show:
+        raise ValueError("missing --show http://...")
+
+    db = Db()
+
+    port = 8051
+    reactor.listenTCP(port, cyclone.web.Application(handlers=[
+        (r'/', Index),
+        (r'/live', Live),
+        (r'/graph', GraphResource),
+        (r'/patches', Patches),
+        (r'/graphClients', GraphClients),
+
+        (r"/(jquery-1\.7\.2\.min\.js)", cyclone.web.StaticFileHandler,
+         dict(path='lib')),
+        
+        ], db=db))
+    log.info("serving on %s" % port)
+    reactor.run()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/light9/rdfdb.py	Fri Jul 13 18:25:34 2012 +0000
@@ -0,0 +1,214 @@
+from rdflib import ConjunctiveGraph, Graph
+import json, logging, cyclone.httpclient, traceback, urllib
+from twisted.internet import reactor
+log = logging.getLogger()
+
+ALLSTMTS = (None, None, None)
+
+class Patch(object):
+    """
+    the json representation includes the {"patch":...} wrapper
+    """
+    def __init__(self, jsonRepr=None, addQuads=None, delTriples=None,
+                 addGraph=None, delGraph=None):
+        self._jsonRepr = jsonRepr
+        self._addQuads, self._delTriples = addQuads, delTriples
+        self._addGraph, self._delGraph = addGraph, delGraph
+
+        if self._jsonRepr is not None:
+            body = json.loads(self._jsonRepr)
+            self._delGraph = Graph()
+            self._delGraph.parse(data=body['patch']['deletes'], format='nt')
+            self._addGraph = ConjunctiveGraph()
+            self._addGraph.parse(data=body['patch']['adds'], format='nquads')
+
+    @property
+    def addQuads(self):
+        if self._addQuads is None:
+            if self._addGraph is not None:
+                self._addQuads = list(self._addGraph.quads(ALLSTMTS))
+            else:
+                raise
+        return self._addQuads
+
+    @property
+    def delTriples(self):
+        if self._delTriples is None:
+            if self._delGraph is not None:
+                self._delTriples = list(self._delGraph.triples(ALLSTMTS))
+            else:
+                raise
+        return self._delTriples
+
+    @property
+    def addGraph(self):
+        if self._addGraph is None:
+            raise
+        return self._addGraph
+
+    @property
+    def delGraph(self):
+        if self._delGraph is None:
+            raise
+        return self._delGraph
+
+    @property
+    def jsonRepr(self):
+        if self._jsonRepr is None:
+            addGraph = ConjunctiveGraph()
+            #addGraph.addN(addQuads) # no effect on nquad output
+            for s,p,o,c in self.addQuads:
+                addGraph.get_context(c).add((s,p,o))
+                #addGraph.store.add((s,p,o), c) # no effect on nquad output
+            delGraph = Graph()
+            for s in self.delTriples:
+                delGraph.add(s)
+            self._jsonRepr = json.dumps({"patch": {
+                'adds':addGraph.serialize(format='nquads'),
+                'deletes':delGraph.serialize(format='nt'),
+                }})
+        return self._jsonRepr
+
+def sendPatch(putUri, patch):
+
+    # this will take args for sender, etc
+    body = patch.jsonRepr
+    log.debug("send body: %r" % body)
+    def ok(done):
+        if not str(done.code).startswith('2'):
+            raise ValueError("sendPatch request failed %s: %s" % (done.code, done.body))
+        log.debug("sendPatch finished, response: %s" % done.body)
+        return done
+
+    def err(e):
+        log.warn("sendPatch failed %r" % e)
+        raise e
+
+    return cyclone.httpclient.fetch(
+        url=putUri,
+        method='PUT',
+        headers={'Content-Type': ['application/json']},
+        postdata=body,
+        ).addCallbacks(ok, err)
+
+def makePatchEndpointPutMethod(cb):
+    def put(self):
+        try:
+            p = Patch(jsonRepr=self.request.body)
+            log.info("received patch -%d +%d" % (len(p.delGraph), len(p.addGraph)))
+            cb(p)
+        except:
+            traceback.print_exc()
+            raise
+    return put
+
+def makePatchEndpoint(cb):
+    class Update(cyclone.web.RequestHandler):
+        put = makePatchEndpointPutMethod(cb)
+    return Update
+
+class GraphWatchers(object):
+    def __init__(self):
+        self._handlersSp = {} # (s,p): set(handlers)
+
+    def addSubjPredWatcher(self, func, s, p):
+        if func is None:
+            return
+        key = s, p
+        self._handlersSp.setdefault(key, set()).add(func)
+
+    def whoCares(self, p):
+        """what functions would care about the changes in this patch"""
+        ret = set()
+        for s in self._handlersSp.values():
+            ret.update(s)
+        return ret
+
+class SyncedGraph(object):
+    """
+    api like rdflib.Graph which sends updates to rdfdb and can call
+    you back when there are graph changes
+    """
+    def __init__(self, port):
+        _graph = self._graph = ConjunctiveGraph()
+        self._watchers = GraphWatchers()
+        
+        #then i try adding a statement that i will react to if i see it
+        #then i print updates to that statement as they come
+        #and the statement has a PID in it so we can see two clientdemos competing
+        #then factor out this client, and have real light9 tools start using it to build their graphs
+        #they just do full reload on relevant subgraphs at first, get progressively better
+        
+        def onPatch(p):
+            for s in p.delGraph:
+                _graph.remove(s)
+            _graph.addN(p.addGraph.quads(ALLSTMTS))
+            log.info("graph now has %s statements" % len(_graph))
+            self.updateOnPatch(p)
+
+        reactor.listenTCP(port, cyclone.web.Application(handlers=[
+            (r'/update', makePatchEndpoint(onPatch)),
+        ]))
+        self.updateResource = 'http://localhost:%s/update' % port
+        log.info("listening on %s" % port)
+        self.register()
+
+    def updateOnPatch(self, p):
+        for func in self._watchers.whoCares(p):
+            self.addHandler(func)
+
+    def register(self):
+
+        def done(x):
+            print "registered", x.body
+
+        cyclone.httpclient.fetch(
+            url='http://localhost:8051/graphClients',
+            method='POST',
+            headers={'Content-Type': ['application/x-www-form-urlencoded']},
+            postdata=urllib.urlencode([('clientUpdate', self.updateResource)]),
+            ).addCallbacks(done, log.error)
+        log.info("registering with rdfdb")
+
+    def patch(self, p):
+        """send this patch to the server and apply it to our local graph and run handlers"""
+        # currently this has to round-trip. But I could apply the
+        # patch here and have the server not bounce it back to me
+        return sendPatch('http://localhost:8051/patches', p)
+
+    def addHandler(self, func):
+        """
+        run this (idempotent) func, noting what graph values it
+        uses. Run it again in the future if there are changes to those
+        graph values. The func might use different values during that
+        future call, and those will be what we watch for next.
+        """
+
+        # if we saw this func before, we need to forget the old
+        # callbacks it wanted and replace with the new ones we see
+        # now.
+
+        # if one handler func calls another, does that break anything?
+        # maybe not?
+
+        # no plan for sparql queries yet. Hook into a lower layer that
+        # reveals all their statement fetches? Just make them always
+        # new? Cache their results, so if i make the query again and
+        # it gives the same result, I don't call the handler?
+        
+        self.currentFunc = func
+        try:
+            func()
+        finally:
+            self.currentFunc = None
+
+    # these just call through to triples() so it might be possible to
+    # watch just that one
+    def value(self, subj, pred):
+        self._watchers.addSubjPredWatcher(self.currentFunc, subj, pred)
+        return self._graph.value(subj, pred)
+
+    def objects(self, subject=None, predicate=None):
+        self._watchers.addSubjPredWatcher(self.currentFunc, subject, predicate)
+        return self._graph.objects(subject, predicate)
+    
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/light9/rdfdb.xhtml	Fri Jul 13 18:25:34 2012 +0000
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="iso-8859-1"?>
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
+"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
+<html xmlns="http://www.w3.org/1999/xhtml">
+  <head>
+    <title>rdfdb</title>
+    <style type="text/css" media="all">
+      /* <![CDATA[ */
+      .patch {
+          border: 1px solid gray;
+          padding: 2px;
+          margin: 4px;
+      }
+      .patch > div {
+          font-family: monospace;
+          font-size: 80%;
+          white-space: pre-wrap;
+      } 
+      .patch .adds {
+          color: #127E11;
+      }
+      .patch .deletes {
+          color: #DC6F6F;
+      }
+      /* ]]> */
+    </style>
+  </head>
+  <body>
+    <h1>rdfdb</h1>
+    <p>status: <span id="status">starting...</span></p>
+    
+    <section>
+      <h2>Edits</h2>
+      <div id="patches"></div>
+    </section>
+
+    <p>Clients: <span id="clients"/></p>
+
+    <fieldset>
+      <legend>Messages</legend>
+      <div id="out"></div>
+    </fieldset>
+
+    <script type="text/javascript" src="jquery-1.7.2.min.js"></script>
+    <script type="text/javascript">
+      // <![CDATA[
+      $(function(){
+          var ws = new WebSocket("ws://localhost:8051/live");
+          
+          ws.onopen = function() {   $("#status").text("connected"); };
+          ws.onerror = function(e) { $("#status").text("error: "+e); };
+          ws.onclose = function() {  $("#status").text("disconnected"); };
+          ws.onmessage = function (evt) {
+              var d = JSON.parse(evt.data);
+              if (d.clients !== undefined) {
+                  $("#clients").empty().text(JSON.stringify(d.clients));
+              }
+              if (d.patch !== undefined) {
+                  $("#patches").prepend(
+                      $("<fieldset>").addClass("patch")
+                          .append($("<legend>").text("Patch"))
+                          .append($("<div>").addClass("deletes").text(d.patch.deletes))
+                          .append($("<div>").addClass("adds").text(d.patch.adds))
+                  );
+              }
+
+              $('#out').append($('<div>').text(JSON.stringify(evt.data)));
+          };
+      });
+      // ]]>
+    </script>
+
+  </body>
+</html>
\ No newline at end of file