Mercurial > code > home > repos > light9
changeset 798:5c158d37f1ce
autoretry websocket. fix rdflib quad patching. only rerun handlers that asked for the affected subj-preds.
Ignore-this: 31e03cf07e5d460ea5c72d7beccefe7
author | drewp@bigasterisk.com |
---|---|
date | Mon, 16 Jul 2012 00:49:57 +0000 |
parents | 904913de4599 |
children | fcf95ff23cc5 |
files | bin/clientdemo bin/rdfdb light9/rdfdb.xhtml light9/rdfdb/graphfile.py light9/rdfdb/rdflibpatch.py light9/rdfdb/syncedgraph.py |
diffstat | 6 files changed, 200 insertions(+), 42 deletions(-) [+] |
line wrap: on
line diff
--- a/bin/clientdemo Fri Jul 13 19:25:03 2012 +0000 +++ b/bin/clientdemo Mon Jul 16 00:49:57 2012 +0000 @@ -4,7 +4,7 @@ sys.path.append(".") from twisted.internet import reactor import cyclone.web, cyclone.httpclient, logging -from rdflib import Namespace, Literal +from rdflib import Namespace, Literal, URIRef from light9.rdfdb.patch import Patch from light9.rdfdb.syncedgraph import SyncedGraph @@ -14,6 +14,12 @@ g = SyncedGraph("clientdemo") + from light9.Submaster import PersistentSubmaster + sub = PersistentSubmaster(graph=g, uri=URIRef("http://light9.bigasterisk.com/sub/bcools")) + + #get sub to show its updating name, then push that all the way into KC gui so we can see just names refresh in there + + L9 = Namespace("http://light9.bigasterisk.com/") def updateDemoValue(): v = list(g.objects(L9['demo'], L9['is']))
--- a/bin/rdfdb Fri Jul 13 19:25:03 2012 +0000 +++ b/bin/rdfdb Mon Jul 16 00:49:57 2012 +0000 @@ -82,6 +82,7 @@ from rdflib import ConjunctiveGraph, URIRef, Graph from light9.rdfdb.graphfile import GraphFile from light9.rdfdb.patch import Patch, ALLSTMTS +from light9.rdfdb.rdflibpatch import patchQuads from light9.rdfdb import syncedgraph from twisted.internet.inotify import INotify @@ -128,10 +129,13 @@ notifier = INotify() notifier.startReading() - for inFile in ["show/dance2012/config.n3", "demo.n3"]: + for inFile in [#"show/dance2012/config.n3", + "show/dance2012/subs/bcools", + #"demo.n3", + ]: self.g = GraphFile(notifier, inFile, - URIRef("http://example.com/%s" % + URIRef("http://example.com/file/%s" % os.path.basename(inFile)), self.patch, self.getSubgraph) @@ -140,21 +144,13 @@ """ apply this patch to the master graph then notify everyone about it """ - log.info("patching graph with %s adds %s dels" % - (len(p.addQuads), len(p.delQuads))) + log.info("patching graph -%d +%d" % (len(p.delQuads), len(p.addQuads))) + patchQuads(self.graph, p.delQuads, p.addQuads, perfect=True) - for spoc in p.delQuads: - # probably need to insist that these existed, or else cull - # the ones that didn't exist, to make the patch invert right - self.graph.get_context(spoc[3]).remove(spoc[:3]) - - addQuads = p.addQuads[:2] # test - - self.graph.addN(addQuads) self.summarizeToLog() for c in self.clients: - d = c.sendPatch(Patch(addQuads=addQuads, delQuads=p.delQuads)) + d = c.sendPatch(p) d.addErrback(self.clientErrored, c) sendToLiveClients(asJson=p.jsonRepr) @@ -162,9 +158,10 @@ err.trap(twisted.internet.error.ConnectError) log.info("connection error- dropping client %r" % c) self.clients.remove(c) + self.sendClientsToAllLivePages() def summarizeToLog(self): - log.info("contexts in graph %s:" % len(self.graph)) + log.info("contexts in graph (%s total stmts):" % len(self.graph)) for c in self.graph.contexts(): log.info(" %s: %s statements" % (c.identifier, len(self.getSubgraph(c.identifier))))
--- a/light9/rdfdb.xhtml Fri Jul 13 19:25:03 2012 +0000 +++ b/light9/rdfdb.xhtml Mon Jul 16 00:49:57 2012 +0000 @@ -1,4 +1,4 @@ -<?xml version="1.0" encoding="iso-8859-1"?> +<?xml version="1.0" encoding="utf8"?> <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd"> <html xmlns="http://www.w3.org/1999/xhtml"> @@ -6,6 +6,14 @@ <title>rdfdb</title> <style type="text/css" media="all"> /* <![CDATA[ */ +body { +background: black; +color: white; +} + #patches { /* wants flex box */ + max-height: 27em; + overflow-y: scroll; + } .patch { border: 1px solid gray; padding: 2px; @@ -13,14 +21,14 @@ } .patch > div { font-family: monospace; - font-size: 80%; + font-size: 90%; white-space: pre-wrap; } .patch .adds { - color: #127E11; + color: #3AEA38; } .patch .deletes { - color: #DC6F6F; + color: #FF2828; } #out { white-space: pre-wrap; @@ -53,13 +61,8 @@ <script type="text/javascript"> // <![CDATA[ $(function(){ - var ws = new WebSocket("ws://localhost:8051/live"); - - ws.onopen = function() { $("#status").text("connected"); }; - ws.onerror = function(e) { $("#status").text("error: "+e); }; - ws.onclose = function() { $("#status").text("disconnected"); }; - ws.onmessage = function (evt) { - var d = JSON.parse(evt.data); + + function onMessage(d) { if (d.clients !== undefined) { $("#clients").empty().text(JSON.stringify(d.clients)); } @@ -72,8 +75,27 @@ ); } - $('#out').append($('<div>').text(evt.data)); - }; + $('#out').append($('<div>').text(JSON.stringify(d))); + } + + var pong = 0; + function connect() { + var ws = new WebSocket("ws://localhost:8051/live"); + + ws.onopen = function() { $("#status").text("connected"); }; + ws.onerror = function(e) { $("#status").text("error: "+e); }; + ws.onclose = function() { + pong = 1 - pong; + $("#status").text("disconnected (retrying "+(pong ? "😼":"😺")+")"); + // this should be under a requestAnimationFrame to + // save resources + setTimeout(connect, 2000); + }; + ws.onmessage = function (evt) { + onMessage(JSON.parse(evt.data)); + }; + } + connect(); }); // ]]> </script>
--- a/light9/rdfdb/graphfile.py Fri Jul 13 19:25:03 2012 +0000 +++ b/light9/rdfdb/graphfile.py Mon Jul 16 00:49:57 2012 +0000 @@ -24,10 +24,24 @@ """update the graph with any diffs from this file""" old = self.getSubgraph(self.uri) new = Graph() - new.parse(location=self.path, format='n3') + try: + new.parse(location=self.path, format='n3') + except SyntaxError as e: + print e + log.error("syntax error in %s" % self.path) + return adds = [(s, p, o, self.uri) for s, p, o in new - old] dels = [(s, p, o, self.uri) for s, p, o in old - new] + print "file dels" + for s in dels: + print s + print "file adds" + for s in adds: + print s + print "" + + if adds or dels: self.patch(Patch(addQuads=adds, delQuads=dels))
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/light9/rdfdb/rdflibpatch.py Mon Jul 16 00:49:57 2012 +0000 @@ -0,0 +1,81 @@ +""" +this is a proposal for a ConjunctiveGraph method in rdflib +""" + +def patchQuads(graph, deleteQuads, addQuads, perfect=False): + """ + Delete the sequence of given quads. Then add the given quads just + like addN would. If perfect is True, we'll error and not touch the + graph if any of the deletes isn't in the graph or if any of the + adds was already in the graph. + """ + toDelete = [] + for s, p, o, c in deleteQuads: + stmt = (s, p, o) + if perfect: + if not any(graph.store.triples(stmt, c)): + raise ValueError("%r not in %r" % (stmt, c)) + else: + toDelete.append((c, stmt)) + else: + graph.store.remove(stmt, context=c) + for c, stmt in toDelete: + graph.store.remove(stmt, context=c) + + if perfect: + addQuads = list(addQuads) + for spoc in addQuads: + if spoc in graph: + raise ValueError("%r already in %r" % (spoc[:3], spoc[3])) + graph.addN(addQuads) + +import unittest +from rdflib import ConjunctiveGraph, URIRef as U +stmt1 = U('http://a'), U('http://b'), U('http://c'), U('http://ctx1') +stmt2 = U('http://a'), U('http://b'), U('http://c'), U('http://ctx2') +class TestPatchQuads(unittest.TestCase): + def testAddsToNewContext(self): + g = ConjunctiveGraph() + patchQuads(g, [], [stmt1]) + self.assert_(len(g), 1) + quads = list(g.quads((None,None,None))) + self.assertEqual(quads, [stmt1]) + + def testDeletes(self): + g = ConjunctiveGraph() + patchQuads(g, [], [stmt1]) + patchQuads(g, [stmt1], []) + quads = list(g.quads((None,None,None))) + self.assertEqual(quads, []) + + def testDeleteRunsBeforeAdd(self): + g = ConjunctiveGraph() + patchQuads(g, [stmt1], [stmt1]) + quads = list(g.quads((None,None,None))) + self.assertEqual(quads, [stmt1]) + + def testPerfectAddRejectsExistingStmt(self): + g = ConjunctiveGraph() + patchQuads(g, [], [stmt1]) + self.assertRaises(ValueError, patchQuads, g, [], [stmt1], perfect=True) + + def testPerfectAddAllowsExistingStmtInNewContext(self): + g = ConjunctiveGraph() + patchQuads(g, [], [stmt1]) + patchQuads(g, [], [stmt2], perfect=True) + self.assertEqual(len(list(g.quads((None,None,None)))), 2) + + def testPerfectDeleteRejectsAbsentStmt(self): + g = ConjunctiveGraph() + self.assertRaises(ValueError, patchQuads, g, [stmt1], [], perfect=True) + + def testPerfectDeleteAllowsRemovalOfStmtInMultipleContexts(self): + g = ConjunctiveGraph() + patchQuads(g, [], [stmt1, stmt2]) + patchQuads(g, [stmt1], [], perfect=True) + + def testRedundantStmtOkForAddOrDelete(self): + g = ConjunctiveGraph() + patchQuads(g, [], [stmt1, stmt1], perfect=True) + patchQuads(g, [stmt1, stmt1], [], perfect=True) +
--- a/light9/rdfdb/syncedgraph.py Fri Jul 13 19:25:03 2012 +0000 +++ b/light9/rdfdb/syncedgraph.py Mon Jul 16 00:49:57 2012 +0000 @@ -1,8 +1,9 @@ -from rdflib import ConjunctiveGraph +from rdflib import ConjunctiveGraph, RDFS import logging, cyclone.httpclient, traceback, urllib from twisted.internet import reactor log = logging.getLogger() from light9.rdfdb.patch import Patch, ALLSTMTS +from light9.rdfdb.rdflibpatch import patchQuads def sendPatch(putUri, patch): # this will take args for sender, etc @@ -38,6 +39,9 @@ return Update class GraphWatchers(object): + """ + store the current handlers that care about graph changes + """ def __init__(self): self._handlersSp = {} # (s,p): set(handlers) @@ -45,15 +49,35 @@ if func is None: return key = s, p - self._handlersSp.setdefault(key, set()).add(func) + try: + self._handlersSp.setdefault(key, set()).add(func) + except Exception: + print "with key %r and func %r" % (key, func) + raise - def whoCares(self, p): - """what functions would care about the changes in this patch""" + def whoCares(self, patch): + """what handler functions would care about the changes in this patch""" + self.dependencies() + affectedSubjPreds = set([(s, p) for s, p, o, c in patch.addQuads]+ + [(s, p) for s, p, o, c in patch.delQuads]) + ret = set() - for s in self._handlersSp.values(): - ret.update(s) + for (s,p), func in self._handlersSp.iteritems(): + if (s,p) in affectedSubjPreds: + ret.update(func) return ret + def dependencies(self): + """ + for debugging, make a list of all the active handlers and what + data they depend on. This is meant for showing on the web ui + for browsing. + """ + print "whocares" + from pprint import pprint + pprint(self._handlersSp) + + class SyncedGraph(object): """ graph for clients to use. Changes are synced with the master graph @@ -71,9 +95,7 @@ self._watchers = GraphWatchers() def onPatch(p): - for spoc in p.delGraph.quads(ALLSTMTS): - _graph.get_context(spoc[3]).remove(spoc[:3]) - _graph.addN(p.addGraph.quads(ALLSTMTS)) + patchQuads(_graph, p.delQuads, p.addQuads) log.info("graph now has %s statements" % len(_graph)) try: self.updateOnPatch(p) @@ -89,10 +111,7 @@ self.updateResource = 'http://localhost:%s/update' % port log.info("listening on %s" % port) self.register(label) - - def updateOnPatch(self, p): - for func in self._watchers.whoCares(p): - self.addHandler(func) + self.currentFunc = None def register(self, label): @@ -140,13 +159,32 @@ finally: self.currentFunc = None + def updateOnPatch(self, p): + """ + patch p just happened to the graph; call everyone back who + might care, and then notice what data they depend on now + """ + for func in self._watchers.whoCares(p): + # and forget the old one! + self.addHandler(func) + + def _assertCurrent(self): + if self.currentFunc is None: + # this may become a warning later + raise ValueError("asked for graph data outside of a handler") + # these just call through to triples() so it might be possible to # watch just that one def value(self, subj, pred): + self._assertCurrent() self._watchers.addSubjPredWatcher(self.currentFunc, subj, pred) return self._graph.value(subj, pred) def objects(self, subject=None, predicate=None): + self._assertCurrent() self._watchers.addSubjPredWatcher(self.currentFunc, subject, predicate) return self._graph.objects(subject, predicate) + def label(self, uri): + self._assertCurrent() + return self.value(uri, RDFS.label)