Changeset - 5c158d37f1ce
[Not reviewed]
default
0 5 1
drewp@bigasterisk.com - 12 years ago 2012-07-16 00:49:57
drewp@bigasterisk.com
autoretry websocket. fix rdflib quad patching. only rerun handlers that asked for the affected subj-preds.
Ignore-this: 31e03cf07e5d460ea5c72d7beccefe7
6 files changed with 200 insertions and 42 deletions:
0 comments (0 inline, 0 general)
bin/clientdemo
Show inline comments
 
@@ -4,7 +4,7 @@ import os, sys
 
sys.path.append(".")
 
from twisted.internet import reactor
 
import cyclone.web, cyclone.httpclient, logging
 
from rdflib import Namespace, Literal
 
from rdflib import Namespace, Literal, URIRef
 
from light9.rdfdb.patch import Patch
 
from light9.rdfdb.syncedgraph import SyncedGraph
 

	
 
@@ -14,6 +14,12 @@ if __name__ == "__main__":
 

	
 
    g = SyncedGraph("clientdemo")
 

	
 
    from light9.Submaster import PersistentSubmaster
 
    sub = PersistentSubmaster(graph=g, uri=URIRef("http://light9.bigasterisk.com/sub/bcools"))
 

	
 
    #get sub to show its updating name, then push that all the way into KC gui so we can see just names refresh in there
 

	
 

	
 
    L9 = Namespace("http://light9.bigasterisk.com/")
 
    def updateDemoValue():
 
        v = list(g.objects(L9['demo'], L9['is']))
bin/rdfdb
Show inline comments
 
@@ -82,6 +82,7 @@ from light9 import networking, showconfi
 
from rdflib import ConjunctiveGraph, URIRef, Graph
 
from light9.rdfdb.graphfile import GraphFile
 
from light9.rdfdb.patch import Patch, ALLSTMTS
 
from light9.rdfdb.rdflibpatch import patchQuads
 
from light9.rdfdb import syncedgraph
 

	
 
from twisted.internet.inotify import INotify
 
@@ -128,10 +129,13 @@ class Db(object):
 
        notifier = INotify()
 
        notifier.startReading()
 

	
 
        for inFile in ["show/dance2012/config.n3", "demo.n3"]:
 
        for inFile in [#"show/dance2012/config.n3",
 
                       "show/dance2012/subs/bcools",
 
                       #"demo.n3",
 
                       ]:
 
            self.g = GraphFile(notifier,
 
                               inFile,
 
                               URIRef("http://example.com/%s" %
 
                               URIRef("http://example.com/file/%s" %
 
                                      os.path.basename(inFile)),
 
                               self.patch,
 
                               self.getSubgraph)
 
@@ -140,21 +144,13 @@ class Db(object):
 
        """
 
        apply this patch to the master graph then notify everyone about it
 
        """
 
        log.info("patching graph with %s adds %s dels" %
 
                 (len(p.addQuads), len(p.delQuads)))
 
        log.info("patching graph -%d +%d" % (len(p.delQuads), len(p.addQuads)))
 

	
 
        patchQuads(self.graph, p.delQuads, p.addQuads, perfect=True)
 
        
 
        for spoc in p.delQuads:
 
            # probably need to insist that these existed, or else cull
 
            # the ones that didn't exist, to make the patch invert right
 
            self.graph.get_context(spoc[3]).remove(spoc[:3])
 

	
 
        addQuads = p.addQuads[:2] # test
 

	
 
        self.graph.addN(addQuads)
 
        self.summarizeToLog()
 
        for c in self.clients:
 
            d = c.sendPatch(Patch(addQuads=addQuads, delQuads=p.delQuads))
 
            d = c.sendPatch(p)
 
            d.addErrback(self.clientErrored, c)
 
        sendToLiveClients(asJson=p.jsonRepr)
 

	
 
@@ -162,9 +158,10 @@ class Db(object):
 
        err.trap(twisted.internet.error.ConnectError)
 
        log.info("connection error- dropping client %r" % c)
 
        self.clients.remove(c)
 
        self.sendClientsToAllLivePages()        
 

	
 
    def summarizeToLog(self):
 
        log.info("contexts in graph %s:" % len(self.graph))
 
        log.info("contexts in graph (%s total stmts):" % len(self.graph))
 
        for c in self.graph.contexts():
 
            log.info("  %s: %s statements" %
 
                     (c.identifier, len(self.getSubgraph(c.identifier))))
light9/rdfdb.xhtml
Show inline comments
 
<?xml version="1.0" encoding="iso-8859-1"?>
 
<?xml version="1.0" encoding="utf8"?>
 
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
 
"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
 
<html xmlns="http://www.w3.org/1999/xhtml">
 
@@ -6,6 +6,14 @@
 
    <title>rdfdb</title>
 
    <style type="text/css" media="all">
 
      /* <![CDATA[ */
 
body {
 
background: black;
 
color: white;
 
}
 
      #patches { /* wants flex box */
 
          max-height: 27em;
 
          overflow-y: scroll;
 
      }
 
      .patch {
 
          border: 1px solid gray;
 
          padding: 2px;
 
@@ -13,14 +21,14 @@
 
      }
 
      .patch > div {
 
          font-family: monospace;
 
          font-size: 80%;
 
          font-size: 90%;
 
          white-space: pre-wrap;
 
      } 
 
      .patch .adds {
 
          color: #127E11;
 
          color: #3AEA38;
 
      }
 
      .patch .deletes {
 
          color: #DC6F6F;
 
          color: #FF2828;
 
      }
 
      #out {
 
          white-space: pre-wrap;
 
@@ -53,13 +61,8 @@
 
    <script type="text/javascript">
 
      // <![CDATA[
 
      $(function(){
 
          var ws = new WebSocket("ws://localhost:8051/live");
 
          
 
          ws.onopen = function() {   $("#status").text("connected"); };
 
          ws.onerror = function(e) { $("#status").text("error: "+e); };
 
          ws.onclose = function() {  $("#status").text("disconnected"); };
 
          ws.onmessage = function (evt) {
 
              var d = JSON.parse(evt.data);
 

	
 
          function onMessage(d) {
 
              if (d.clients !== undefined) {
 
                  $("#clients").empty().text(JSON.stringify(d.clients));
 
              }
 
@@ -72,8 +75,27 @@
 
                  );
 
              }
 

	
 
              $('#out').append($('<div>').text(evt.data));
 
          };
 
              $('#out').append($('<div>').text(JSON.stringify(d)));
 
          }
 
          
 
          var pong = 0;
 
          function connect() {
 
              var ws = new WebSocket("ws://localhost:8051/live");
 
              
 
              ws.onopen = function() {   $("#status").text("connected"); };
 
              ws.onerror = function(e) { $("#status").text("error: "+e); };
 
              ws.onclose = function() {  
 
                  pong = 1 - pong;
 
                  $("#status").text("disconnected (retrying "+(pong ? "😼":"😺")+")"); 
 
                  // this should be under a requestAnimationFrame to
 
                  // save resources
 
                  setTimeout(connect, 2000);
 
              };
 
              ws.onmessage = function (evt) {
 
                  onMessage(JSON.parse(evt.data));
 
              };
 
          }
 
          connect();
 
      });
 
      // ]]>
 
    </script>
light9/rdfdb/graphfile.py
Show inline comments
 
@@ -24,10 +24,24 @@ class GraphFile(object):
 
        """update the graph with any diffs from this file"""
 
        old = self.getSubgraph(self.uri)
 
        new = Graph()
 
        new.parse(location=self.path, format='n3')
 
        try:
 
            new.parse(location=self.path, format='n3')
 
        except SyntaxError as e:
 
            print e
 
            log.error("syntax error in %s" % self.path)
 
            return
 

	
 
        adds = [(s, p, o, self.uri) for s, p, o in new - old]
 
        dels = [(s, p, o, self.uri) for s, p, o in old - new]
 

	
 
        print "file dels"
 
        for s  in dels:
 
            print s
 
        print "file adds"
 
        for s in adds:
 
            print s
 
        print ""
 

	
 
        
 
        if adds or dels:
 
            self.patch(Patch(addQuads=adds, delQuads=dels))
light9/rdfdb/rdflibpatch.py
Show inline comments
 
new file 100644
 
"""
 
this is a proposal for a ConjunctiveGraph method in rdflib
 
"""
 

	
 
def patchQuads(graph, deleteQuads, addQuads, perfect=False):
 
    """
 
    Delete the sequence of given quads. Then add the given quads just
 
    like addN would. If perfect is True, we'll error and not touch the
 
    graph if any of the deletes isn't in the graph or if any of the
 
    adds was already in the graph.
 
    """
 
    toDelete = []
 
    for s, p, o, c in deleteQuads:
 
        stmt = (s, p, o)
 
        if perfect:
 
            if not any(graph.store.triples(stmt, c)):
 
                raise ValueError("%r not in %r" % (stmt, c))
 
            else:
 
                toDelete.append((c, stmt))
 
        else:
 
            graph.store.remove(stmt, context=c)
 
    for c, stmt in toDelete:
 
        graph.store.remove(stmt, context=c)
 

	
 
    if perfect:
 
        addQuads = list(addQuads)
 
        for spoc in addQuads:
 
            if spoc in graph:
 
                raise ValueError("%r already in %r" % (spoc[:3], spoc[3]))
 
    graph.addN(addQuads)
 

	
 
import unittest
 
from rdflib import ConjunctiveGraph, URIRef as U
 
stmt1 = U('http://a'), U('http://b'), U('http://c'), U('http://ctx1')
 
stmt2 = U('http://a'), U('http://b'), U('http://c'), U('http://ctx2')
 
class TestPatchQuads(unittest.TestCase):
 
    def testAddsToNewContext(self):
 
        g = ConjunctiveGraph()
 
        patchQuads(g, [], [stmt1])
 
        self.assert_(len(g), 1)
 
        quads = list(g.quads((None,None,None)))
 
        self.assertEqual(quads, [stmt1])
 

	
 
    def testDeletes(self):
 
        g = ConjunctiveGraph()
 
        patchQuads(g, [], [stmt1])
 
        patchQuads(g, [stmt1], [])
 
        quads = list(g.quads((None,None,None)))
 
        self.assertEqual(quads, [])
 

	
 
    def testDeleteRunsBeforeAdd(self):
 
        g = ConjunctiveGraph()
 
        patchQuads(g, [stmt1], [stmt1])
 
        quads = list(g.quads((None,None,None)))
 
        self.assertEqual(quads, [stmt1])
 
        
 
    def testPerfectAddRejectsExistingStmt(self):
 
        g = ConjunctiveGraph()
 
        patchQuads(g, [], [stmt1])
 
        self.assertRaises(ValueError, patchQuads, g, [], [stmt1], perfect=True)
 

	
 
    def testPerfectAddAllowsExistingStmtInNewContext(self):
 
        g = ConjunctiveGraph()
 
        patchQuads(g, [], [stmt1])
 
        patchQuads(g, [], [stmt2], perfect=True)
 
        self.assertEqual(len(list(g.quads((None,None,None)))), 2)
 

	
 
    def testPerfectDeleteRejectsAbsentStmt(self):
 
        g = ConjunctiveGraph()
 
        self.assertRaises(ValueError, patchQuads, g, [stmt1], [], perfect=True)
 
        
 
    def testPerfectDeleteAllowsRemovalOfStmtInMultipleContexts(self):
 
        g = ConjunctiveGraph()
 
        patchQuads(g, [], [stmt1, stmt2])
 
        patchQuads(g, [stmt1], [], perfect=True)
 

	
 
    def testRedundantStmtOkForAddOrDelete(self):
 
        g = ConjunctiveGraph()
 
        patchQuads(g, [], [stmt1, stmt1], perfect=True)
 
        patchQuads(g, [stmt1, stmt1], [], perfect=True)
 
        
light9/rdfdb/syncedgraph.py
Show inline comments
 
from rdflib import ConjunctiveGraph
 
from rdflib import ConjunctiveGraph, RDFS
 
import logging, cyclone.httpclient, traceback, urllib
 
from twisted.internet import reactor
 
log = logging.getLogger()
 
from light9.rdfdb.patch import Patch, ALLSTMTS
 
from light9.rdfdb.rdflibpatch import patchQuads
 

	
 
def sendPatch(putUri, patch):
 
    # this will take args for sender, etc
 
@@ -38,6 +39,9 @@ def makePatchEndpoint(cb):
 
    return Update
 

	
 
class GraphWatchers(object):
 
    """
 
    store the current handlers that care about graph changes
 
    """
 
    def __init__(self):
 
        self._handlersSp = {} # (s,p): set(handlers)
 

	
 
@@ -45,15 +49,35 @@ class GraphWatchers(object):
 
        if func is None:
 
            return
 
        key = s, p
 
        self._handlersSp.setdefault(key, set()).add(func)
 
        try:
 
            self._handlersSp.setdefault(key, set()).add(func)
 
        except Exception:
 
            print "with key %r and func %r" % (key, func)
 
            raise
 

	
 
    def whoCares(self, p):
 
        """what functions would care about the changes in this patch"""
 
    def whoCares(self, patch):
 
        """what handler functions would care about the changes in this patch"""
 
        self.dependencies()
 
        affectedSubjPreds = set([(s, p) for s, p, o, c in patch.addQuads]+
 
                                [(s, p) for s, p, o, c in patch.delQuads])
 
        
 
        ret = set()
 
        for s in self._handlersSp.values():
 
            ret.update(s)
 
        for (s,p), func in self._handlersSp.iteritems():
 
            if (s,p) in affectedSubjPreds:
 
                ret.update(func)
 
        return ret
 

	
 
    def dependencies(self):
 
        """
 
        for debugging, make a list of all the active handlers and what
 
        data they depend on. This is meant for showing on the web ui
 
        for browsing.
 
        """
 
        print "whocares"
 
        from pprint import pprint
 
        pprint(self._handlersSp)
 
        
 

	
 
class SyncedGraph(object):
 
    """
 
    graph for clients to use. Changes are synced with the master graph
 
@@ -71,9 +95,7 @@ class SyncedGraph(object):
 
        self._watchers = GraphWatchers()
 
        
 
        def onPatch(p):
 
            for spoc in p.delGraph.quads(ALLSTMTS):
 
                _graph.get_context(spoc[3]).remove(spoc[:3])
 
            _graph.addN(p.addGraph.quads(ALLSTMTS))
 
            patchQuads(_graph, p.delQuads, p.addQuads)
 
            log.info("graph now has %s statements" % len(_graph))
 
            try:
 
                self.updateOnPatch(p)
 
@@ -89,10 +111,7 @@ class SyncedGraph(object):
 
        self.updateResource = 'http://localhost:%s/update' % port
 
        log.info("listening on %s" % port)
 
        self.register(label)
 

	
 
    def updateOnPatch(self, p):
 
        for func in self._watchers.whoCares(p):
 
            self.addHandler(func)
 
        self.currentFunc = None
 

	
 
    def register(self, label):
 

	
 
@@ -140,13 +159,32 @@ class SyncedGraph(object):
 
        finally:
 
            self.currentFunc = None
 

	
 
    def updateOnPatch(self, p):
 
        """
 
        patch p just happened to the graph; call everyone back who
 
        might care, and then notice what data they depend on now
 
        """
 
        for func in self._watchers.whoCares(p):
 
            # and forget the old one!
 
            self.addHandler(func)
 

	
 
    def _assertCurrent(self):
 
        if self.currentFunc is None:
 
            # this may become a warning later
 
            raise ValueError("asked for graph data outside of a handler")
 

	
 
    # these just call through to triples() so it might be possible to
 
    # watch just that one
 
    def value(self, subj, pred):
 
        self._assertCurrent()
 
        self._watchers.addSubjPredWatcher(self.currentFunc, subj, pred)
 
        return self._graph.value(subj, pred)
 

	
 
    def objects(self, subject=None, predicate=None):
 
        self._assertCurrent()
 
        self._watchers.addSubjPredWatcher(self.currentFunc, subject, predicate)
 
        return self._graph.objects(subject, predicate)
 
    
 
    def label(self, uri):
 
        self._assertCurrent()
 
        return self.value(uri, RDFS.label)
0 comments (0 inline, 0 general)