changeset 798:5c158d37f1ce

autoretry websocket. fix rdflib quad patching. only rerun handlers that asked for the affected subj-preds. Ignore-this: 31e03cf07e5d460ea5c72d7beccefe7
author drewp@bigasterisk.com
date Mon, 16 Jul 2012 00:49:57 +0000
parents 904913de4599
children fcf95ff23cc5
files bin/clientdemo bin/rdfdb light9/rdfdb.xhtml light9/rdfdb/graphfile.py light9/rdfdb/rdflibpatch.py light9/rdfdb/syncedgraph.py
diffstat 6 files changed, 200 insertions(+), 42 deletions(-) [+]
line wrap: on
line diff
--- a/bin/clientdemo	Fri Jul 13 19:25:03 2012 +0000
+++ b/bin/clientdemo	Mon Jul 16 00:49:57 2012 +0000
@@ -4,7 +4,7 @@
 sys.path.append(".")
 from twisted.internet import reactor
 import cyclone.web, cyclone.httpclient, logging
-from rdflib import Namespace, Literal
+from rdflib import Namespace, Literal, URIRef
 from light9.rdfdb.patch import Patch
 from light9.rdfdb.syncedgraph import SyncedGraph
 
@@ -14,6 +14,12 @@
 
     g = SyncedGraph("clientdemo")
 
+    from light9.Submaster import PersistentSubmaster
+    sub = PersistentSubmaster(graph=g, uri=URIRef("http://light9.bigasterisk.com/sub/bcools"))
+
+    #get sub to show its updating name, then push that all the way into KC gui so we can see just names refresh in there
+
+
     L9 = Namespace("http://light9.bigasterisk.com/")
     def updateDemoValue():
         v = list(g.objects(L9['demo'], L9['is']))
--- a/bin/rdfdb	Fri Jul 13 19:25:03 2012 +0000
+++ b/bin/rdfdb	Mon Jul 16 00:49:57 2012 +0000
@@ -82,6 +82,7 @@
 from rdflib import ConjunctiveGraph, URIRef, Graph
 from light9.rdfdb.graphfile import GraphFile
 from light9.rdfdb.patch import Patch, ALLSTMTS
+from light9.rdfdb.rdflibpatch import patchQuads
 from light9.rdfdb import syncedgraph
 
 from twisted.internet.inotify import INotify
@@ -128,10 +129,13 @@
         notifier = INotify()
         notifier.startReading()
 
-        for inFile in ["show/dance2012/config.n3", "demo.n3"]:
+        for inFile in [#"show/dance2012/config.n3",
+                       "show/dance2012/subs/bcools",
+                       #"demo.n3",
+                       ]:
             self.g = GraphFile(notifier,
                                inFile,
-                               URIRef("http://example.com/%s" %
+                               URIRef("http://example.com/file/%s" %
                                       os.path.basename(inFile)),
                                self.patch,
                                self.getSubgraph)
@@ -140,21 +144,13 @@
         """
         apply this patch to the master graph then notify everyone about it
         """
-        log.info("patching graph with %s adds %s dels" %
-                 (len(p.addQuads), len(p.delQuads)))
+        log.info("patching graph -%d +%d" % (len(p.delQuads), len(p.addQuads)))
 
+        patchQuads(self.graph, p.delQuads, p.addQuads, perfect=True)
         
-        for spoc in p.delQuads:
-            # probably need to insist that these existed, or else cull
-            # the ones that didn't exist, to make the patch invert right
-            self.graph.get_context(spoc[3]).remove(spoc[:3])
-
-        addQuads = p.addQuads[:2] # test
-
-        self.graph.addN(addQuads)
         self.summarizeToLog()
         for c in self.clients:
-            d = c.sendPatch(Patch(addQuads=addQuads, delQuads=p.delQuads))
+            d = c.sendPatch(p)
             d.addErrback(self.clientErrored, c)
         sendToLiveClients(asJson=p.jsonRepr)
 
@@ -162,9 +158,10 @@
         err.trap(twisted.internet.error.ConnectError)
         log.info("connection error- dropping client %r" % c)
         self.clients.remove(c)
+        self.sendClientsToAllLivePages()        
 
     def summarizeToLog(self):
-        log.info("contexts in graph %s:" % len(self.graph))
+        log.info("contexts in graph (%s total stmts):" % len(self.graph))
         for c in self.graph.contexts():
             log.info("  %s: %s statements" %
                      (c.identifier, len(self.getSubgraph(c.identifier))))
--- a/light9/rdfdb.xhtml	Fri Jul 13 19:25:03 2012 +0000
+++ b/light9/rdfdb.xhtml	Mon Jul 16 00:49:57 2012 +0000
@@ -1,4 +1,4 @@
-<?xml version="1.0" encoding="iso-8859-1"?>
+<?xml version="1.0" encoding="utf8"?>
 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
 "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
 <html xmlns="http://www.w3.org/1999/xhtml">
@@ -6,6 +6,14 @@
     <title>rdfdb</title>
     <style type="text/css" media="all">
       /* <![CDATA[ */
+body {
+background: black;
+color: white;
+}
+      #patches { /* wants flex box */
+          max-height: 27em;
+          overflow-y: scroll;
+      }
       .patch {
           border: 1px solid gray;
           padding: 2px;
@@ -13,14 +21,14 @@
       }
       .patch > div {
           font-family: monospace;
-          font-size: 80%;
+          font-size: 90%;
           white-space: pre-wrap;
       } 
       .patch .adds {
-          color: #127E11;
+          color: #3AEA38;
       }
       .patch .deletes {
-          color: #DC6F6F;
+          color: #FF2828;
       }
       #out {
           white-space: pre-wrap;
@@ -53,13 +61,8 @@
     <script type="text/javascript">
       // <![CDATA[
       $(function(){
-          var ws = new WebSocket("ws://localhost:8051/live");
-          
-          ws.onopen = function() {   $("#status").text("connected"); };
-          ws.onerror = function(e) { $("#status").text("error: "+e); };
-          ws.onclose = function() {  $("#status").text("disconnected"); };
-          ws.onmessage = function (evt) {
-              var d = JSON.parse(evt.data);
+
+          function onMessage(d) {
               if (d.clients !== undefined) {
                   $("#clients").empty().text(JSON.stringify(d.clients));
               }
@@ -72,8 +75,27 @@
                   );
               }
 
-              $('#out').append($('<div>').text(evt.data));
-          };
+              $('#out').append($('<div>').text(JSON.stringify(d)));
+          }
+          
+          var pong = 0;
+          function connect() {
+              var ws = new WebSocket("ws://localhost:8051/live");
+              
+              ws.onopen = function() {   $("#status").text("connected"); };
+              ws.onerror = function(e) { $("#status").text("error: "+e); };
+              ws.onclose = function() {  
+                  pong = 1 - pong;
+                  $("#status").text("disconnected (retrying "+(pong ? "😼":"😺")+")"); 
+                  // this should be under a requestAnimationFrame to
+                  // save resources
+                  setTimeout(connect, 2000);
+              };
+              ws.onmessage = function (evt) {
+                  onMessage(JSON.parse(evt.data));
+              };
+          }
+          connect();
       });
       // ]]>
     </script>
--- a/light9/rdfdb/graphfile.py	Fri Jul 13 19:25:03 2012 +0000
+++ b/light9/rdfdb/graphfile.py	Mon Jul 16 00:49:57 2012 +0000
@@ -24,10 +24,24 @@
         """update the graph with any diffs from this file"""
         old = self.getSubgraph(self.uri)
         new = Graph()
-        new.parse(location=self.path, format='n3')
+        try:
+            new.parse(location=self.path, format='n3')
+        except SyntaxError as e:
+            print e
+            log.error("syntax error in %s" % self.path)
+            return
 
         adds = [(s, p, o, self.uri) for s, p, o in new - old]
         dels = [(s, p, o, self.uri) for s, p, o in old - new]
 
+        print "file dels"
+        for s  in dels:
+            print s
+        print "file adds"
+        for s in adds:
+            print s
+        print ""
+
+        
         if adds or dels:
             self.patch(Patch(addQuads=adds, delQuads=dels))
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/light9/rdfdb/rdflibpatch.py	Mon Jul 16 00:49:57 2012 +0000
@@ -0,0 +1,81 @@
+"""
+this is a proposal for a ConjunctiveGraph method in rdflib
+"""
+
+def patchQuads(graph, deleteQuads, addQuads, perfect=False):
+    """
+    Delete the sequence of given quads. Then add the given quads just
+    like addN would. If perfect is True, we'll error and not touch the
+    graph if any of the deletes isn't in the graph or if any of the
+    adds was already in the graph.
+    """
+    toDelete = []
+    for s, p, o, c in deleteQuads:
+        stmt = (s, p, o)
+        if perfect:
+            if not any(graph.store.triples(stmt, c)):
+                raise ValueError("%r not in %r" % (stmt, c))
+            else:
+                toDelete.append((c, stmt))
+        else:
+            graph.store.remove(stmt, context=c)
+    for c, stmt in toDelete:
+        graph.store.remove(stmt, context=c)
+
+    if perfect:
+        addQuads = list(addQuads)
+        for spoc in addQuads:
+            if spoc in graph:
+                raise ValueError("%r already in %r" % (spoc[:3], spoc[3]))
+    graph.addN(addQuads)
+
+import unittest
+from rdflib import ConjunctiveGraph, URIRef as U
+stmt1 = U('http://a'), U('http://b'), U('http://c'), U('http://ctx1')
+stmt2 = U('http://a'), U('http://b'), U('http://c'), U('http://ctx2')
+class TestPatchQuads(unittest.TestCase):
+    def testAddsToNewContext(self):
+        g = ConjunctiveGraph()
+        patchQuads(g, [], [stmt1])
+        self.assert_(len(g), 1)
+        quads = list(g.quads((None,None,None)))
+        self.assertEqual(quads, [stmt1])
+
+    def testDeletes(self):
+        g = ConjunctiveGraph()
+        patchQuads(g, [], [stmt1])
+        patchQuads(g, [stmt1], [])
+        quads = list(g.quads((None,None,None)))
+        self.assertEqual(quads, [])
+
+    def testDeleteRunsBeforeAdd(self):
+        g = ConjunctiveGraph()
+        patchQuads(g, [stmt1], [stmt1])
+        quads = list(g.quads((None,None,None)))
+        self.assertEqual(quads, [stmt1])
+        
+    def testPerfectAddRejectsExistingStmt(self):
+        g = ConjunctiveGraph()
+        patchQuads(g, [], [stmt1])
+        self.assertRaises(ValueError, patchQuads, g, [], [stmt1], perfect=True)
+
+    def testPerfectAddAllowsExistingStmtInNewContext(self):
+        g = ConjunctiveGraph()
+        patchQuads(g, [], [stmt1])
+        patchQuads(g, [], [stmt2], perfect=True)
+        self.assertEqual(len(list(g.quads((None,None,None)))), 2)
+
+    def testPerfectDeleteRejectsAbsentStmt(self):
+        g = ConjunctiveGraph()
+        self.assertRaises(ValueError, patchQuads, g, [stmt1], [], perfect=True)
+        
+    def testPerfectDeleteAllowsRemovalOfStmtInMultipleContexts(self):
+        g = ConjunctiveGraph()
+        patchQuads(g, [], [stmt1, stmt2])
+        patchQuads(g, [stmt1], [], perfect=True)
+
+    def testRedundantStmtOkForAddOrDelete(self):
+        g = ConjunctiveGraph()
+        patchQuads(g, [], [stmt1, stmt1], perfect=True)
+        patchQuads(g, [stmt1, stmt1], [], perfect=True)
+        
--- a/light9/rdfdb/syncedgraph.py	Fri Jul 13 19:25:03 2012 +0000
+++ b/light9/rdfdb/syncedgraph.py	Mon Jul 16 00:49:57 2012 +0000
@@ -1,8 +1,9 @@
-from rdflib import ConjunctiveGraph
+from rdflib import ConjunctiveGraph, RDFS
 import logging, cyclone.httpclient, traceback, urllib
 from twisted.internet import reactor
 log = logging.getLogger()
 from light9.rdfdb.patch import Patch, ALLSTMTS
+from light9.rdfdb.rdflibpatch import patchQuads
 
 def sendPatch(putUri, patch):
     # this will take args for sender, etc
@@ -38,6 +39,9 @@
     return Update
 
 class GraphWatchers(object):
+    """
+    store the current handlers that care about graph changes
+    """
     def __init__(self):
         self._handlersSp = {} # (s,p): set(handlers)
 
@@ -45,15 +49,35 @@
         if func is None:
             return
         key = s, p
-        self._handlersSp.setdefault(key, set()).add(func)
+        try:
+            self._handlersSp.setdefault(key, set()).add(func)
+        except Exception:
+            print "with key %r and func %r" % (key, func)
+            raise
 
-    def whoCares(self, p):
-        """what functions would care about the changes in this patch"""
+    def whoCares(self, patch):
+        """what handler functions would care about the changes in this patch"""
+        self.dependencies()
+        affectedSubjPreds = set([(s, p) for s, p, o, c in patch.addQuads]+
+                                [(s, p) for s, p, o, c in patch.delQuads])
+        
         ret = set()
-        for s in self._handlersSp.values():
-            ret.update(s)
+        for (s,p), func in self._handlersSp.iteritems():
+            if (s,p) in affectedSubjPreds:
+                ret.update(func)
         return ret
 
+    def dependencies(self):
+        """
+        for debugging, make a list of all the active handlers and what
+        data they depend on. This is meant for showing on the web ui
+        for browsing.
+        """
+        print "whocares"
+        from pprint import pprint
+        pprint(self._handlersSp)
+        
+
 class SyncedGraph(object):
     """
     graph for clients to use. Changes are synced with the master graph
@@ -71,9 +95,7 @@
         self._watchers = GraphWatchers()
         
         def onPatch(p):
-            for spoc in p.delGraph.quads(ALLSTMTS):
-                _graph.get_context(spoc[3]).remove(spoc[:3])
-            _graph.addN(p.addGraph.quads(ALLSTMTS))
+            patchQuads(_graph, p.delQuads, p.addQuads)
             log.info("graph now has %s statements" % len(_graph))
             try:
                 self.updateOnPatch(p)
@@ -89,10 +111,7 @@
         self.updateResource = 'http://localhost:%s/update' % port
         log.info("listening on %s" % port)
         self.register(label)
-
-    def updateOnPatch(self, p):
-        for func in self._watchers.whoCares(p):
-            self.addHandler(func)
+        self.currentFunc = None
 
     def register(self, label):
 
@@ -140,13 +159,32 @@
         finally:
             self.currentFunc = None
 
+    def updateOnPatch(self, p):
+        """
+        patch p just happened to the graph; call everyone back who
+        might care, and then notice what data they depend on now
+        """
+        for func in self._watchers.whoCares(p):
+            # and forget the old one!
+            self.addHandler(func)
+
+    def _assertCurrent(self):
+        if self.currentFunc is None:
+            # this may become a warning later
+            raise ValueError("asked for graph data outside of a handler")
+
     # these just call through to triples() so it might be possible to
     # watch just that one
     def value(self, subj, pred):
+        self._assertCurrent()
         self._watchers.addSubjPredWatcher(self.currentFunc, subj, pred)
         return self._graph.value(subj, pred)
 
     def objects(self, subject=None, predicate=None):
+        self._assertCurrent()
         self._watchers.addSubjPredWatcher(self.currentFunc, subject, predicate)
         return self._graph.objects(subject, predicate)
     
+    def label(self, uri):
+        self._assertCurrent()
+        return self.value(uri, RDFS.label)