Changeset - 5c158d37f1ce
[Not reviewed]
default
0 5 1
drewp@bigasterisk.com - 12 years ago 2012-07-16 00:49:57
drewp@bigasterisk.com
autoretry websocket. fix rdflib quad patching. only rerun handlers that asked for the affected subj-preds.
Ignore-this: 31e03cf07e5d460ea5c72d7beccefe7
6 files changed with 200 insertions and 42 deletions:
0 comments (0 inline, 0 general)
bin/clientdemo
Show inline comments
 
#!bin/python
 

	
 
import os, sys
 
sys.path.append(".")
 
from twisted.internet import reactor
 
import cyclone.web, cyclone.httpclient, logging
 
from rdflib import Namespace, Literal
 
from rdflib import Namespace, Literal, URIRef
 
from light9.rdfdb.patch import Patch
 
from light9.rdfdb.syncedgraph import SyncedGraph
 

	
 
if __name__ == "__main__":
 
    logging.basicConfig(level=logging.DEBUG)
 
    log = logging.getLogger()
 

	
 
    g = SyncedGraph("clientdemo")
 

	
 
    from light9.Submaster import PersistentSubmaster
 
    sub = PersistentSubmaster(graph=g, uri=URIRef("http://light9.bigasterisk.com/sub/bcools"))
 

	
 
    #get sub to show its updating name, then push that all the way into KC gui so we can see just names refresh in there
 

	
 

	
 
    L9 = Namespace("http://light9.bigasterisk.com/")
 
    def updateDemoValue():
 
        v = list(g.objects(L9['demo'], L9['is']))
 
        print "demo value is %r" % v
 

	
 
    g.addHandler(updateDemoValue)
bin/rdfdb
Show inline comments
 
@@ -79,12 +79,13 @@ import sys, optparse, logging, json, os
 
import cyclone.web, cyclone.httpclient, cyclone.websocket
 
sys.path.append(".")
 
from light9 import networking, showconfig
 
from rdflib import ConjunctiveGraph, URIRef, Graph
 
from light9.rdfdb.graphfile import GraphFile
 
from light9.rdfdb.patch import Patch, ALLSTMTS
 
from light9.rdfdb.rdflibpatch import patchQuads
 
from light9.rdfdb import syncedgraph
 

	
 
from twisted.internet.inotify import INotify
 
logging.basicConfig(level=logging.DEBUG)
 
log = logging.getLogger()
 

	
 
@@ -125,49 +126,45 @@ class Db(object):
 
        self.clients = []
 
        self.graph = ConjunctiveGraph()
 

	
 
        notifier = INotify()
 
        notifier.startReading()
 

	
 
        for inFile in ["show/dance2012/config.n3", "demo.n3"]:
 
        for inFile in [#"show/dance2012/config.n3",
 
                       "show/dance2012/subs/bcools",
 
                       #"demo.n3",
 
                       ]:
 
            self.g = GraphFile(notifier,
 
                               inFile,
 
                               URIRef("http://example.com/%s" %
 
                               URIRef("http://example.com/file/%s" %
 
                                      os.path.basename(inFile)),
 
                               self.patch,
 
                               self.getSubgraph)
 

	
 
    def patch(self, p):
 
        """
 
        apply this patch to the master graph then notify everyone about it
 
        """
 
        log.info("patching graph with %s adds %s dels" %
 
                 (len(p.addQuads), len(p.delQuads)))
 
        log.info("patching graph -%d +%d" % (len(p.delQuads), len(p.addQuads)))
 

	
 
        patchQuads(self.graph, p.delQuads, p.addQuads, perfect=True)
 
        
 
        for spoc in p.delQuads:
 
            # probably need to insist that these existed, or else cull
 
            # the ones that didn't exist, to make the patch invert right
 
            self.graph.get_context(spoc[3]).remove(spoc[:3])
 

	
 
        addQuads = p.addQuads[:2] # test
 

	
 
        self.graph.addN(addQuads)
 
        self.summarizeToLog()
 
        for c in self.clients:
 
            d = c.sendPatch(Patch(addQuads=addQuads, delQuads=p.delQuads))
 
            d = c.sendPatch(p)
 
            d.addErrback(self.clientErrored, c)
 
        sendToLiveClients(asJson=p.jsonRepr)
 

	
 
    def clientErrored(self, err, c):
 
        err.trap(twisted.internet.error.ConnectError)
 
        log.info("connection error- dropping client %r" % c)
 
        self.clients.remove(c)
 
        self.sendClientsToAllLivePages()        
 

	
 
    def summarizeToLog(self):
 
        log.info("contexts in graph %s:" % len(self.graph))
 
        log.info("contexts in graph (%s total stmts):" % len(self.graph))
 
        for c in self.graph.contexts():
 
            log.info("  %s: %s statements" %
 
                     (c.identifier, len(self.getSubgraph(c.identifier))))
 

	
 
    def getSubgraph(self, uri):
 
        # this is returning an empty Graph :(
light9/rdfdb.xhtml
Show inline comments
 
<?xml version="1.0" encoding="iso-8859-1"?>
 
<?xml version="1.0" encoding="utf8"?>
 
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
 
"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
 
<html xmlns="http://www.w3.org/1999/xhtml">
 
  <head>
 
    <title>rdfdb</title>
 
    <style type="text/css" media="all">
 
      /* <![CDATA[ */
 
body {
 
background: black;
 
color: white;
 
}
 
      #patches { /* wants flex box */
 
          max-height: 27em;
 
          overflow-y: scroll;
 
      }
 
      .patch {
 
          border: 1px solid gray;
 
          padding: 2px;
 
          margin: 4px;
 
      }
 
      .patch > div {
 
          font-family: monospace;
 
          font-size: 80%;
 
          font-size: 90%;
 
          white-space: pre-wrap;
 
      } 
 
      .patch .adds {
 
          color: #127E11;
 
          color: #3AEA38;
 
      }
 
      .patch .deletes {
 
          color: #DC6F6F;
 
          color: #FF2828;
 
      }
 
      #out {
 
          white-space: pre-wrap;
 
      }
 
      .patch fieldset {
 
          color: gray;
 
@@ -50,33 +58,47 @@
 
    </fieldset>
 

	
 
    <script type="text/javascript" src="jquery-1.7.2.min.js"></script>
 
    <script type="text/javascript">
 
      // <![CDATA[
 
      $(function(){
 
          var ws = new WebSocket("ws://localhost:8051/live");
 
          
 
          ws.onopen = function() {   $("#status").text("connected"); };
 
          ws.onerror = function(e) { $("#status").text("error: "+e); };
 
          ws.onclose = function() {  $("#status").text("disconnected"); };
 
          ws.onmessage = function (evt) {
 
              var d = JSON.parse(evt.data);
 

	
 
          function onMessage(d) {
 
              if (d.clients !== undefined) {
 
                  $("#clients").empty().text(JSON.stringify(d.clients));
 
              }
 
              if (d.patch !== undefined) {
 
                  $("#patches").prepend(
 
                      $("<fieldset>").addClass("patch")
 
                          .append($("<legend>").text("Patch"))
 
                          .append($("<div>").addClass("deletes").text(d.patch.deletes))
 
                          .append($("<div>").addClass("adds").text(d.patch.adds))
 
                  );
 
              }
 

	
 
              $('#out').append($('<div>').text(evt.data));
 
          };
 
              $('#out').append($('<div>').text(JSON.stringify(d)));
 
          }
 
          
 
          var pong = 0;
 
          function connect() {
 
              var ws = new WebSocket("ws://localhost:8051/live");
 
              
 
              ws.onopen = function() {   $("#status").text("connected"); };
 
              ws.onerror = function(e) { $("#status").text("error: "+e); };
 
              ws.onclose = function() {  
 
                  pong = 1 - pong;
 
                  $("#status").text("disconnected (retrying "+(pong ? "😼":"😺")+")"); 
 
                  // this should be under a requestAnimationFrame to
 
                  // save resources
 
                  setTimeout(connect, 2000);
 
              };
 
              ws.onmessage = function (evt) {
 
                  onMessage(JSON.parse(evt.data));
 
              };
 
          }
 
          connect();
 
      });
 
      // ]]>
 
    </script>
 

	
 
  </body>
 
</html>
 
\ No newline at end of file
light9/rdfdb/graphfile.py
Show inline comments
 
@@ -21,13 +21,27 @@ class GraphFile(object):
 
        self.reread()
 

	
 
    def reread(self):
 
        """update the graph with any diffs from this file"""
 
        old = self.getSubgraph(self.uri)
 
        new = Graph()
 
        new.parse(location=self.path, format='n3')
 
        try:
 
            new.parse(location=self.path, format='n3')
 
        except SyntaxError as e:
 
            print e
 
            log.error("syntax error in %s" % self.path)
 
            return
 

	
 
        adds = [(s, p, o, self.uri) for s, p, o in new - old]
 
        dels = [(s, p, o, self.uri) for s, p, o in old - new]
 

	
 
        print "file dels"
 
        for s  in dels:
 
            print s
 
        print "file adds"
 
        for s in adds:
 
            print s
 
        print ""
 

	
 
        
 
        if adds or dels:
 
            self.patch(Patch(addQuads=adds, delQuads=dels))
light9/rdfdb/rdflibpatch.py
Show inline comments
 
new file 100644
 
"""
 
this is a proposal for a ConjunctiveGraph method in rdflib
 
"""
 

	
 
def patchQuads(graph, deleteQuads, addQuads, perfect=False):
 
    """
 
    Delete the sequence of given quads. Then add the given quads just
 
    like addN would. If perfect is True, we'll error and not touch the
 
    graph if any of the deletes isn't in the graph or if any of the
 
    adds was already in the graph.
 
    """
 
    toDelete = []
 
    for s, p, o, c in deleteQuads:
 
        stmt = (s, p, o)
 
        if perfect:
 
            if not any(graph.store.triples(stmt, c)):
 
                raise ValueError("%r not in %r" % (stmt, c))
 
            else:
 
                toDelete.append((c, stmt))
 
        else:
 
            graph.store.remove(stmt, context=c)
 
    for c, stmt in toDelete:
 
        graph.store.remove(stmt, context=c)
 

	
 
    if perfect:
 
        addQuads = list(addQuads)
 
        for spoc in addQuads:
 
            if spoc in graph:
 
                raise ValueError("%r already in %r" % (spoc[:3], spoc[3]))
 
    graph.addN(addQuads)
 

	
 
import unittest
 
from rdflib import ConjunctiveGraph, URIRef as U
 
stmt1 = U('http://a'), U('http://b'), U('http://c'), U('http://ctx1')
 
stmt2 = U('http://a'), U('http://b'), U('http://c'), U('http://ctx2')
 
class TestPatchQuads(unittest.TestCase):
 
    def testAddsToNewContext(self):
 
        g = ConjunctiveGraph()
 
        patchQuads(g, [], [stmt1])
 
        self.assert_(len(g), 1)
 
        quads = list(g.quads((None,None,None)))
 
        self.assertEqual(quads, [stmt1])
 

	
 
    def testDeletes(self):
 
        g = ConjunctiveGraph()
 
        patchQuads(g, [], [stmt1])
 
        patchQuads(g, [stmt1], [])
 
        quads = list(g.quads((None,None,None)))
 
        self.assertEqual(quads, [])
 

	
 
    def testDeleteRunsBeforeAdd(self):
 
        g = ConjunctiveGraph()
 
        patchQuads(g, [stmt1], [stmt1])
 
        quads = list(g.quads((None,None,None)))
 
        self.assertEqual(quads, [stmt1])
 
        
 
    def testPerfectAddRejectsExistingStmt(self):
 
        g = ConjunctiveGraph()
 
        patchQuads(g, [], [stmt1])
 
        self.assertRaises(ValueError, patchQuads, g, [], [stmt1], perfect=True)
 

	
 
    def testPerfectAddAllowsExistingStmtInNewContext(self):
 
        g = ConjunctiveGraph()
 
        patchQuads(g, [], [stmt1])
 
        patchQuads(g, [], [stmt2], perfect=True)
 
        self.assertEqual(len(list(g.quads((None,None,None)))), 2)
 

	
 
    def testPerfectDeleteRejectsAbsentStmt(self):
 
        g = ConjunctiveGraph()
 
        self.assertRaises(ValueError, patchQuads, g, [stmt1], [], perfect=True)
 
        
 
    def testPerfectDeleteAllowsRemovalOfStmtInMultipleContexts(self):
 
        g = ConjunctiveGraph()
 
        patchQuads(g, [], [stmt1, stmt2])
 
        patchQuads(g, [stmt1], [], perfect=True)
 

	
 
    def testRedundantStmtOkForAddOrDelete(self):
 
        g = ConjunctiveGraph()
 
        patchQuads(g, [], [stmt1, stmt1], perfect=True)
 
        patchQuads(g, [stmt1, stmt1], [], perfect=True)
 
        
light9/rdfdb/syncedgraph.py
Show inline comments
 
from rdflib import ConjunctiveGraph
 
from rdflib import ConjunctiveGraph, RDFS
 
import logging, cyclone.httpclient, traceback, urllib
 
from twisted.internet import reactor
 
log = logging.getLogger()
 
from light9.rdfdb.patch import Patch, ALLSTMTS
 
from light9.rdfdb.rdflibpatch import patchQuads
 

	
 
def sendPatch(putUri, patch):
 
    # this will take args for sender, etc
 
    body = patch.jsonRepr
 
    log.debug("send body: %r" % body)
 
    def ok(done):
 
@@ -35,28 +36,51 @@ def makePatchEndpointPutMethod(cb):
 
def makePatchEndpoint(cb):
 
    class Update(cyclone.web.RequestHandler):
 
        put = makePatchEndpointPutMethod(cb)
 
    return Update
 

	
 
class GraphWatchers(object):
 
    """
 
    store the current handlers that care about graph changes
 
    """
 
    def __init__(self):
 
        self._handlersSp = {} # (s,p): set(handlers)
 

	
 
    def addSubjPredWatcher(self, func, s, p):
 
        if func is None:
 
            return
 
        key = s, p
 
        self._handlersSp.setdefault(key, set()).add(func)
 
        try:
 
            self._handlersSp.setdefault(key, set()).add(func)
 
        except Exception:
 
            print "with key %r and func %r" % (key, func)
 
            raise
 

	
 
    def whoCares(self, p):
 
        """what functions would care about the changes in this patch"""
 
    def whoCares(self, patch):
 
        """what handler functions would care about the changes in this patch"""
 
        self.dependencies()
 
        affectedSubjPreds = set([(s, p) for s, p, o, c in patch.addQuads]+
 
                                [(s, p) for s, p, o, c in patch.delQuads])
 
        
 
        ret = set()
 
        for s in self._handlersSp.values():
 
            ret.update(s)
 
        for (s,p), func in self._handlersSp.iteritems():
 
            if (s,p) in affectedSubjPreds:
 
                ret.update(func)
 
        return ret
 

	
 
    def dependencies(self):
 
        """
 
        for debugging, make a list of all the active handlers and what
 
        data they depend on. This is meant for showing on the web ui
 
        for browsing.
 
        """
 
        print "whocares"
 
        from pprint import pprint
 
        pprint(self._handlersSp)
 
        
 

	
 
class SyncedGraph(object):
 
    """
 
    graph for clients to use. Changes are synced with the master graph
 
    in the rdfdb process. 
 
    
 
    This api is like rdflib.Graph but it can also call you back when
 
@@ -68,15 +92,13 @@ class SyncedGraph(object):
 
        with your connection
 
        """
 
        _graph = self._graph = ConjunctiveGraph()
 
        self._watchers = GraphWatchers()
 
        
 
        def onPatch(p):
 
            for spoc in p.delGraph.quads(ALLSTMTS):
 
                _graph.get_context(spoc[3]).remove(spoc[:3])
 
            _graph.addN(p.addGraph.quads(ALLSTMTS))
 
            patchQuads(_graph, p.delQuads, p.addQuads)
 
            log.info("graph now has %s statements" % len(_graph))
 
            try:
 
                self.updateOnPatch(p)
 
            except Exception:
 
                # don't reflect this back to the server; we did
 
                # receive its patch correctly.
 
@@ -86,16 +108,13 @@ class SyncedGraph(object):
 
            (r'/update', makePatchEndpoint(onPatch)),
 
        ]))
 
        port = listen._realPortNumber  # what's the right call for this?
 
        self.updateResource = 'http://localhost:%s/update' % port
 
        log.info("listening on %s" % port)
 
        self.register(label)
 

	
 
    def updateOnPatch(self, p):
 
        for func in self._watchers.whoCares(p):
 
            self.addHandler(func)
 
        self.currentFunc = None
 

	
 
    def register(self, label):
 

	
 
        def done(x):
 
            print "registered", x.body
 

	
 
@@ -137,16 +156,35 @@ class SyncedGraph(object):
 
        self.currentFunc = func
 
        try:
 
            func()
 
        finally:
 
            self.currentFunc = None
 

	
 
    def updateOnPatch(self, p):
 
        """
 
        patch p just happened to the graph; call everyone back who
 
        might care, and then notice what data they depend on now
 
        """
 
        for func in self._watchers.whoCares(p):
 
            # and forget the old one!
 
            self.addHandler(func)
 

	
 
    def _assertCurrent(self):
 
        if self.currentFunc is None:
 
            # this may become a warning later
 
            raise ValueError("asked for graph data outside of a handler")
 

	
 
    # these just call through to triples() so it might be possible to
 
    # watch just that one
 
    def value(self, subj, pred):
 
        self._assertCurrent()
 
        self._watchers.addSubjPredWatcher(self.currentFunc, subj, pred)
 
        return self._graph.value(subj, pred)
 

	
 
    def objects(self, subject=None, predicate=None):
 
        self._assertCurrent()
 
        self._watchers.addSubjPredWatcher(self.currentFunc, subject, predicate)
 
        return self._graph.objects(subject, predicate)
 
    
 
    def label(self, uri):
 
        self._assertCurrent()
 
        return self.value(uri, RDFS.label)
0 comments (0 inline, 0 general)