# HG changeset patch
# User drewp@bigasterisk.com
# Date 2012-07-19 04:23:06
# Node ID a631e075a5bf213dc9b3a3fa2ab97f1dbfb67059
# Parent 3fe281a3be70d88c0544d94076fd854a3e1380c6
KC big rewrites, now multiple KC instances can sync with rdfdb
Ignore-this: 8c75ec3e2bd360c6eb87f7f4d4b3dcc4
diff --git a/bin/keyboardcomposer b/bin/keyboardcomposer
--- a/bin/keyboardcomposer
+++ b/bin/keyboardcomposer
@@ -1,12 +1,13 @@
#!bin/python
from __future__ import division, nested_scopes
-import cgi, os, sys, time, subprocess, logging
+import cgi, os, sys, time, subprocess, logging, random
from optparse import OptionParser
import webcolors, colorsys
from louie import dispatcher
from twisted.internet import reactor, tksupport
from twisted.web import xmlrpc, server, resource
+from rdflib import URIRef, Literal, RDF
from Tix import *
import Tix as tk
import pickle
@@ -20,6 +21,7 @@ from light9.uihelpers import toplevelat,
from light9.namespaces import L9
from light9.tkdnd import initTkdnd, dragSourceRegister
from light9.rdfdb.syncedgraph import SyncedGraph
+from light9.rdfdb.patch import Patch
from bcf2000 import BCF2000
@@ -55,8 +57,12 @@ class SubScale(Scale, Fadable):
else:
self['troughcolor'] = 'blue'
-class SubmasterTk(Frame):
- def __init__(self, master, sub, current_level):
+class SubmasterBox(Frame):
+ """
+ this object owns the level of the submaster (the rdf graph is the
+ real authority)
+ """
+ def __init__(self, master, sub):
self.sub = sub
bg = sub.graph.value(sub.uri, L9.color, default='#000000')
rgb = webcolors.hex_to_rgb(bg)
@@ -66,7 +72,7 @@ class SubmasterTk(Frame):
Frame.__init__(self, master, bd=1, relief='raised', bg=bg)
self.name = sub.name
self.slider_var = DoubleVar()
- self.slider_var.set(current_level)
+ self.pauseTrace = False
self.scale = SubScale(self, variable=self.slider_var, width=20)
self.namelabel = Label(self, font="Arial 7", bg=darkBg,
@@ -83,6 +89,66 @@ class SubmasterTk(Frame):
for w in [self, self.namelabel, levellabel]:
dragSourceRegister(w, 'copy', 'text/uri-list', sub.uri)
+ self._slider_var_trace = self.slider_var.trace('w', self.slider_changed)
+
+ sub.graph.addHandler(self.updateLevelFromGraph)
+
+ # initial position
+# self.send_to_hw(sub.name, col + 1) # needs fix
+
+ def cleanup(self):
+ self.slider_var.trace_vdelete('w', self._slider_var_trace)
+
+ def slider_changed(self, *args):
+ self.scale.draw_indicator_colors()
+
+ if self.pauseTrace:
+ return
+ self.updateGraphWithLevel(self.sub.uri, self.slider_var.get())
+ # dispatcher.send("level changed") # in progress
+ ###self.send_levels() # use dispatcher?
+
+ # needs fixing: plan is to use dispatcher or a method call to tell a hardware-mapping object who changed, and then it can make io if that's a current hw slider
+ #if rowcount == self.current_row:
+ # self.send_to_hw(sub.name, col + 1)
+
+ def updateGraphWithLevel(self, uri, level):
+ """in our per-session graph, we maintain SubSetting objects like this:
+
+ [a :SubSetting; :sub ?s; :level ?l]
+ """
+ ctx = URIRef("http://example.com/kc/session1")
+ with self.sub.graph.currentState(context=ctx) as graph:
+ adds = set([])
+ for setting in graph.subjects(RDF.type, L9['SubSetting']):
+ if graph.value(setting, L9['sub']) == uri:
+ break
+ else:
+ setting = URIRef("http://example.com/subsetting/%s" %
+ random.randrange(999999))
+ adds.update([
+ (setting, RDF.type, L9['SubSetting'], ctx),
+ (setting, L9['sub'], uri, ctx),
+ ])
+ dels = set([])
+ for prev in graph.objects(setting, L9['level']):
+ dels.add((setting, L9['level'], prev, ctx))
+ adds.add((setting, L9['level'], Literal(level), ctx))
+
+ if adds != dels:
+ self.sub.graph.patch(Patch(delQuads=dels, addQuads=adds))
+
+ def updateLevelFromGraph(self):
+ """read rdf level, write it to subbox.slider_var"""
+ graph = self.sub.graph
+ for setting in graph.subjects(RDF.type, L9['SubSetting']):
+ if graph.value(setting, L9['sub']) == self.sub.uri:
+ self.pauseTrace = True # don't bounce this update back to server
+ try:
+ self.slider_var.set(graph.value(setting, L9['level']))
+ finally:
+ self.pauseTrace = False
+
def updateName(self):
self.namelabel.config(text=self.sub.graph.label(self.sub.uri))
@@ -90,23 +156,17 @@ class SubmasterTk(Frame):
subprocess.Popen(["bin/subcomposer", "--no-geometry", self.name])
class KeyboardComposer(Frame, SubClient):
- def __init__(self, root, graph, current_sub_levels=None,
+ def __init__(self, root, graph,
hw_sliders=True):
Frame.__init__(self, root, bg='black')
SubClient.__init__(self)
self.graph = graph
self.submasters = Submasters(graph)
- self.name_to_subtk = {}
- self.current_sub_levels = {}
- self.current_row = 0
- if current_sub_levels is not None:
- self.current_sub_levels = current_sub_levels
- else:
- try:
- self.current_sub_levels, self.current_row = \
- pickle.load(file('.keyboardcomposer.savedlevels'))
- except IOError:
- pass
+ self.subbox = {} # sub uri : SubmasterBox
+ self.slider_table = {} # coords : SubmasterBox
+ self.rows = [] # this holds Tk Frames for each row
+
+ self.current_row = 0 # should come from session graph
self.use_hw_sliders = hw_sliders
self.connect_to_hw(hw_sliders)
@@ -141,10 +201,6 @@ class KeyboardComposer(Frame, SubClient)
self.sub_name.pack(side=LEFT)
def redraw_sliders(self):
- self.slider_vars = {} # this holds suburi : sub Tk vars
- self.slider_table = {} # this holds coords:sub Tk vars
- self.name_to_subtk.clear() # subname : SubmasterTk instance
-
self.graph.addHandler(self.draw_sliders)
if len(self.rows):
self.change_row(self.current_row)
@@ -161,13 +217,13 @@ class KeyboardComposer(Frame, SubClient)
self.graph.addHandler(self.draw_sliders)
def draw_sliders(self):
-
-
- if hasattr(self, 'rows'):
- for r in self.rows:
- r.destroy()
- self.rows = [] # this holds Tk Frames for each row
-
+ for r in self.rows:
+ r.destroy()
+ self.rows = []
+ for b in self.subbox.values():
+ b.cleanup()
+ self.subbox.clear()
+ self.slider_table.clear()
self.tk_focusFollowsMouse()
@@ -178,7 +234,6 @@ class KeyboardComposer(Frame, SubClient)
# there are unlikely to be any subs at startup because we
# probably haven't been called back with the graph data yet
- #read get_all_subs then watch 'new submaster' 'lost submaster' signals
withgroups = sorted((self.graph.value(sub.uri, L9['group']),
self.graph.value(sub.uri, L9['order']),
sub)
@@ -190,37 +245,20 @@ class KeyboardComposer(Frame, SubClient)
for group, order, sub in withgroups:
group = self.graph.value(sub.uri, L9['group'])
- if col == 0 or group != last_group: # make new row
+ if col == 0 or group != last_group:
row = self.make_row()
rowcount += 1
col = 0
- current_level = self.current_sub_levels.get(sub.name, 0)
- subtk = self.draw_sub_slider(row, col, sub, current_level)
- self.slider_table[(rowcount, col)] = subtk
- self.name_to_subtk[sub.name] = subtk
- def slider_changed(x, y, z, subtk=subtk,
- col=col, sub=sub, rowcount=rowcount):
- subtk.scale.draw_indicator_colors()
- self.send_levels()
- if rowcount == self.current_row:
- self.send_to_hw(sub.name, col + 1)
+ subbox = SubmasterBox(row, sub)
+ subbox.place(relx=col / 8, rely=0, relwidth=1 / 8, relheight=1)
+ self.subbox[sub.uri] = self.slider_table[(rowcount, col)] = subbox
- subtk.slider_var.trace('w', slider_changed)
+ self.setup_key_nudgers(subbox.scale)
- # initial position
- self.send_to_hw(sub.name, col + 1)
col = (col + 1) % 8
last_group = group
-
- def draw_sub_slider(self, row, col, sub, current_level):
- subtk = SubmasterTk(row, sub, current_level)
- subtk.place(relx=col / 8, rely=0, relwidth=1 / 8, relheight=1)
- self.setup_key_nudgers(subtk.scale)
-
- self.slider_vars[sub.uri] = subtk.slider_var
- return subtk
-
+
def toggle_slider_connectedness(self):
self.use_hw_sliders = not self.use_hw_sliders
if self.use_hw_sliders:
@@ -301,7 +339,7 @@ class KeyboardComposer(Frame, SubClient)
for col in range(1, 9):
try:
- subtk = self.slider_table[(self.current_row, col - 1)]
+ subbox = self.slider_table[(self.current_row, col - 1)]
self.sliders.valueOut("button-upper%d" % col, True)
except KeyError:
# unfilled bottom row has holes (plus rows with incomplete
@@ -309,32 +347,32 @@ class KeyboardComposer(Frame, SubClient)
self.sliders.valueOut("button-upper%d" % col, False)
self.sliders.valueOut("slider%d" % col, 0)
continue
- self.send_to_hw(subtk.name, col)
+ self.send_to_hw(subbox.name, col)
def got_nudger(self, number, direction, full=0):
try:
- subtk = self.slider_table[(self.current_row, number)]
+ subbox = self.slider_table[(self.current_row, number)]
except KeyError:
return
if direction == 'up':
if full:
- subtk.scale.fade(1)
+ subbox.scale.fade(1)
else:
- subtk.scale.increase()
+ subbox.scale.increase()
else:
if full:
- subtk.scale.fade(0)
+ subbox.scale.fade(0)
else:
- subtk.scale.decrease()
+ subbox.scale.decrease()
def hw_slider_moved(self, col, value):
value = int(value * 100) / 100
try:
- subtk = self.slider_table[(self.current_row, col)]
+ subbox = self.slider_table[(self.current_row, col)]
except KeyError:
return # no slider assigned at that column
- subtk.scale.set(value)
+ subbox.scale.set(value)
def send_to_hw(self, subUri, hwNum):
if isinstance(self.sliders, DummySliders):
@@ -367,8 +405,8 @@ class KeyboardComposer(Frame, SubClient)
row['bg'] = 'black'
def get_levels(self):
- return dict([(uri, slidervar.get())
- for uri, slidervar in self.slider_vars.items()])
+ return dict([(uri, box.slider_var.get())
+ for uri, box in self.subbox.items()])
def get_levels_as_sub(self):
scaledsubs = [self.submasters.get_sub_by_uri(sub) * level
@@ -386,6 +424,7 @@ class KeyboardComposer(Frame, SubClient)
def save(self):
pickle.dump((self.get_levels(), self.current_row),
file('.keyboardcomposer.savedlevels', 'w'))
+
def send_frequent_updates(self):
"""called when we get a fade -- send events as quickly as possible"""
if time.time() <= self.stop_frequent_update_time:
@@ -393,9 +432,9 @@ class KeyboardComposer(Frame, SubClient)
self.after(10, self.send_frequent_updates)
def alltozero(self):
- for name, subtk in self.name_to_subtk.items():
- if subtk.scale.scale_var.get() != 0:
- subtk.scale.fade(value=0.0, length=0)
+ for uri, subbox in self.subbox.items():
+ if subbox.scale.scale_var.get() != 0:
+ subbox.scale.fade(value=0.0, length=0)
# move to web lib
def postArgGetter(request):
@@ -416,15 +455,15 @@ def postArgGetter(request):
class LevelServerHttp(resource.Resource):
isLeaf = True
- def __init__(self,name_to_subtk):
- self.name_to_subtk = name_to_subtk
+ def __init__(self,name_to_subbox):
+ self.name_to_subbox = name_to_subbox
def render_POST(self, request):
arg = postArgGetter(request)
if request.path == '/fadesub':
# fadesub?subname=scoop&level=0&secs=.2
- self.name_to_subtk[arg('subname')].scale.fade(
+ self.name_to_subbox[arg('subname')].scale.fade(
float(arg('level')),
float(arg('secs')))
return "set %s to %s" % (arg('subname'), arg('level'))
@@ -477,8 +516,6 @@ class Sliders(BCF2000):
if __name__ == "__main__":
parser = OptionParser()
- parser.add_option('--nonpersistent', action="store_true",
- help="don't load or save levels")
parser.add_option('--no-sliders', action='store_true',
help="don't attach to hardware sliders")
parser.add_option('-v', action='store_true', help="log info level")
@@ -494,10 +531,7 @@ if __name__ == "__main__":
tl = toplevelat("Keyboard Composer", existingtoplevel=root)
- startLevels = None
- if opts.nonpersistent:
- startLevels = {}
- kc = KeyboardComposer(tl, graph, startLevels,
+ kc = KeyboardComposer(tl, graph,
hw_sliders=not opts.no_sliders)
kc.pack(fill=BOTH, expand=1)
@@ -505,17 +539,16 @@ if __name__ == "__main__":
tk.Label(root,text=helpline, font="Helvetica -12 italic",
anchor='w').pack(side='top',fill='x')
- import twisted.internet
- try:
- reactor.listenTCP(networking.keyboardComposer.port,
- server.Site(LevelServerHttp(kc.name_to_subtk)))
- except twisted.internet.error.CannotListenError, e:
- log.warn("Can't (and won't!) start level server:")
- log.warn(e)
+ if 0: # needs fixing, or maybe it's obsolete because other progs can just patch the rdf graph
+ import twisted.internet
+ try:
+ reactor.listenTCP(networking.keyboardComposer.port,
+ server.Site(LevelServerHttp(kc.name_to_subbox)))
+ except twisted.internet.error.CannotListenError, e:
+ log.warn("Can't (and won't!) start level server:")
+ log.warn(e)
root.protocol('WM_DELETE_WINDOW', reactor.stop)
- if not opts.nonpersistent:
- reactor.addSystemEventTrigger('after', 'shutdown', kc.save)
tksupport.install(root,ms=10)
diff --git a/bin/rdfdb b/bin/rdfdb
--- a/bin/rdfdb
+++ b/bin/rdfdb
@@ -95,7 +95,7 @@ import twisted.internet.error
import sys, optparse, logging, json, os
import cyclone.web, cyclone.httpclient, cyclone.websocket
sys.path.append(".")
-from light9 import networking, showconfig
+from light9 import networking, showconfig, prof
from rdflib import ConjunctiveGraph, URIRef, Graph
from light9.rdfdb.graphfile import GraphFile
from light9.rdfdb.patch import Patch, ALLSTMTS
@@ -167,9 +167,13 @@ class Db(object):
log.info("patching graph -%d +%d" % (len(p.delQuads), len(p.addQuads)))
patchQuads(self.graph, p.delQuads, p.addQuads, perfect=True)
-
+ senderUpdateUri = getattr(p, 'senderUpdateUri', None)
self.summarizeToLog()
for c in self.clients:
+ print "send to %s? %s %s" % (c, c.updateUri, senderUpdateUri)
+ if c.updateUri == senderUpdateUri:
+ # this client has self-applied the patch already
+ continue
d = c.sendPatch(p)
d.addErrback(self.clientErrored, c)
sendToLiveClients(asJson=p.jsonRepr)
@@ -293,4 +297,4 @@ if __name__ == "__main__":
], db=db))
log.info("serving on %s" % port)
- reactor.run()
+ prof.run(reactor.run, profile=False)
diff --git a/light9/Submaster.py b/light9/Submaster.py
--- a/light9/Submaster.py
+++ b/light9/Submaster.py
@@ -150,18 +150,18 @@ class PersistentSubmaster(Submaster):
log.info("sub %s changed" % self.name)
def setLevelsFromGraph(self):
- patchGraph = showconfig.getGraph() # going away
if hasattr(self, 'levels'):
self.levels.clear()
else:
self.levels = {}
for lev in self.graph.objects(self.uri, L9['lightLevel']):
chan = self.graph.value(lev, L9['channel'])
- val = self.graph.value(lev, L9['level'])
- name = patchGraph.label(chan)
+
+ name = self.graph.label(chan)
if not name:
log.error("sub %r has channel %r with no name- leaving out that channel" % (self.name, chan))
continue
+ val = self.graph.value(lev, L9['level'])
self.levels[name] = float(val)
def save(self):
diff --git a/light9/rdfdb/patch.py b/light9/rdfdb/patch.py
--- a/light9/rdfdb/patch.py
+++ b/light9/rdfdb/patch.py
@@ -1,18 +1,13 @@
-import json
-from rdflib import ConjunctiveGraph
+import json, unittest
+from rdflib import ConjunctiveGraph, URIRef
+from light9.rdfdb.rdflibpatch import graphFromNQuad, graphFromQuads, serializeQuad
ALLSTMTS = (None, None, None)
-def graphFromQuads(q):
- g = ConjunctiveGraph()
- #g.addN(q) # no effect on nquad output
- for s,p,o,c in q:
- g.get_context(c).add((s,p,o))
- #g.store.add((s,p,o), c) # no effect on nquad output
- return g
-
class Patch(object):
"""
+ immutable
+
the json representation includes the {"patch":...} wrapper
"""
def __init__(self, jsonRepr=None, addQuads=None, delQuads=None,
@@ -23,10 +18,10 @@ class Patch(object):
if self._jsonRepr is not None:
body = json.loads(self._jsonRepr)
- self._delGraph = ConjunctiveGraph()
- self._delGraph.parse(data=body['patch']['deletes'], format='nquads')
- self._addGraph = ConjunctiveGraph()
- self._addGraph.parse(data=body['patch']['adds'], format='nquads')
+ self._delGraph = graphFromNQuad(body['patch']['deletes'])
+ self._addGraph = graphFromNQuad(body['patch']['adds'])
+ if 'senderUpdateUri' in body:
+ self.senderUpdateUri = body['senderUpdateUri']
@property
def addQuads(self):
@@ -61,8 +56,39 @@ class Patch(object):
@property
def jsonRepr(self):
if self._jsonRepr is None:
- self._jsonRepr = json.dumps({"patch": {
- 'adds':self.addGraph.serialize(format='nquads'),
- 'deletes':self.delGraph.serialize(format='nquads'),
- }})
+ self._jsonRepr = self.makeJsonRepr()
return self._jsonRepr
+
+ def makeJsonRepr(self, extraAttrs={}):
+ d = {"patch" : {
+ 'adds' : serializeQuad(self.addGraph),
+ 'deletes' : serializeQuad(self.delGraph),
+ }}
+ if len(self.addGraph) > 0 and d['patch']['adds'].strip() == "":
+ # this is the bug that graphFromNQuad works around
+ raise ValueError("nquads serialization failure")
+ if '[<' in d['patch']['adds']:
+ raise ValueError("[< found in %s" % d['patch']['adds'])
+ d.update(extraAttrs)
+ return json.dumps(d)
+
+ def concat(self, more):
+ """
+ new Patch with the result of applying this patch and the
+ sequence of other Patches
+ """
+ # not working yet
+ adds = set(self.addQuads)
+ dels = set(self.delQuads)
+ for p2 in more:
+ for q in p2.delQuads:
+ if q in adds:
+ adds.remove(q)
+ else:
+ dels.add(q)
+ for q in p2.addQuads:
+ if q in dels:
+ dels.remove(q)
+ else:
+ adds.add(q)
+ return Patch(delQuads=dels, addQuads=adds)
diff --git a/light9/rdfdb/rdflibpatch.py b/light9/rdfdb/rdflibpatch.py
--- a/light9/rdfdb/rdflibpatch.py
+++ b/light9/rdfdb/rdflibpatch.py
@@ -1,6 +1,8 @@
"""
this is a proposal for a ConjunctiveGraph method in rdflib
"""
+import unittest
+from rdflib import ConjunctiveGraph, URIRef as U
def patchQuads(graph, deleteQuads, addQuads, perfect=False):
"""
@@ -29,8 +31,57 @@ def patchQuads(graph, deleteQuads, addQu
raise ValueError("%r already in %r" % (spoc[:3], spoc[3]))
graph.addN(addQuads)
-import unittest
-from rdflib import ConjunctiveGraph, URIRef as U
+
+
+def graphFromQuads(q):
+ g = ConjunctiveGraph()
+ #g.addN(q) # no effect on nquad output
+ for s,p,o,c in q:
+ #g.get_context(c).add((s,p,o)) # kind of works with broken rdflib nquad serializer code
+ g.store.add((s,p,o), c) # no effect on nquad output
+ return g
+
+def graphFromNQuad(text):
+ """
+ g.parse(data=self.nqOut, format='nquads')
+ makes a graph that serializes to nothing
+ """
+ g1 = ConjunctiveGraph()
+ g1.parse(data=text, format='nquads')
+ g2 = ConjunctiveGraph()
+ for s,p,o,c in g1.quads((None,None,None)):
+ #g2.get_context(c).add((s,p,o))
+ g2.store.add((s,p,o), c)
+ #import pprint; pprint.pprint(g2.store.__dict__)
+ return g2
+
+from rdflib.plugins.serializers.nt import _xmlcharref_encode
+def serializeQuad(g):
+ """replacement for graph.serialize(format='nquads')"""
+ out = ""
+ for s,p,o,c in g.quads((None,None,None)):
+ out += u"%s %s %s %s .\n" % (s.n3(),
+ p.n3(),
+ _xmlcharref_encode(o.n3()),
+ c.n3())
+ return out
+
+class TestGraphFromQuads(unittest.TestCase):
+ nqOut = ' .\n'
+ def testSerializes(self):
+ n = U("http://example.com/")
+ g = graphFromQuads([(n,n,n,n)])
+ out = serializeQuad(g)
+ self.assertEqual(out.strip(), self.nqOut.strip())
+
+ def testNquadParserSerializes(self):
+ g = graphFromNQuad(self.nqOut)
+ self.assertEqual(len(g), 1)
+ out = serializeQuad(g)
+ self.assertEqual(out.strip(), self.nqOut.strip())
+
+
+
stmt1 = U('http://a'), U('http://b'), U('http://c'), U('http://ctx1')
stmt2 = U('http://a'), U('http://b'), U('http://c'), U('http://ctx2')
class TestPatchQuads(unittest.TestCase):
diff --git a/light9/rdfdb/syncedgraph.py b/light9/rdfdb/syncedgraph.py
--- a/light9/rdfdb/syncedgraph.py
+++ b/light9/rdfdb/syncedgraph.py
@@ -1,14 +1,16 @@
-from rdflib import ConjunctiveGraph, RDFS, RDF
+from rdflib import ConjunctiveGraph, RDFS, RDF, Graph
import logging, cyclone.httpclient, traceback, urllib
from twisted.internet import reactor
log = logging.getLogger('syncedgraph')
from light9.rdfdb.patch import Patch, ALLSTMTS
from light9.rdfdb.rdflibpatch import patchQuads
-def sendPatch(putUri, patch):
- # this will take args for sender, etc
- body = patch.jsonRepr
- log.debug("send body: %r" % body)
+def sendPatch(putUri, patch, **kw):
+ """
+ kwargs will become extra attributes in the toplevel json object
+ """
+ body = patch.makeJsonRepr(kw)
+ log.debug("send body: %r", body)
def ok(done):
if not str(done.code).startswith('2'):
raise ValueError("sendPatch request failed %s: %s" % (done.code, done.body))
@@ -64,7 +66,7 @@ class GraphWatchers(object):
this removes the handlers that it gives you
"""
- self.dependencies()
+ #self.dependencies()
affectedSubjPreds = set([(s, p) for s, p, o, c in patch.addQuads]+
[(s, p) for s, p, o, c in patch.delQuads])
affectedPredObjs = set([(p, o) for s, p, o, c in patch.addQuads]+
@@ -94,6 +96,58 @@ class GraphWatchers(object):
pprint(self._handlersSp)
+class PatchSender(object):
+ """
+ SyncedGraph may generate patches faster than we can send
+ them. This object buffers and may even collapse patches before
+ they go the server
+ """
+ def __init__(self, target, myUpdateResource):
+ self.target = target
+ self.myUpdateResource = myUpdateResource
+ self._patchesToSend = []
+ self._currentSendPatchRequest = None
+
+ def sendPatch(self, p):
+ self._patchesToSend.append(p)
+ self._continueSending()
+
+ def _continueSending(self):
+ if not self._patchesToSend or self._currentSendPatchRequest:
+ return
+ if len(self._patchesToSend) > 1:
+ log.info("%s patches left to send", len(self._patchesToSend))
+ # this is where we could concatenate little patches into a
+ # bigger one. Often, many statements will cancel each
+ # other out. not working yet:
+ if 0:
+ p = self._patchesToSend[0].concat(self._patchesToSend[1:])
+ print "concat down to"
+ print 'dels'
+ for q in p.delQuads: print q
+ print 'adds'
+ for q in p.addQuads: print q
+ print "----"
+ else:
+ p = self._patchesToSend.pop(0)
+ else:
+ p = self._patchesToSend.pop(0)
+
+ self._currentSendPatchRequest = sendPatch(
+ self.target, p, senderUpdateUri=self.myUpdateResource)
+ self._currentSendPatchRequest.addCallbacks(self._sendPatchDone,
+ self._sendPatchErr)
+
+ def _sendPatchDone(self, result):
+ self._currentSendPatchRequest = None
+ self._continueSending()
+
+ def _sendPatchErr(self, e):
+ self._currentSendPatchRequest = None
+ log.error(e)
+ self._continueSending()
+
+
class SyncedGraph(object):
"""
graph for clients to use. Changes are synced with the master graph
@@ -111,7 +165,10 @@ class SyncedGraph(object):
self._watchers = GraphWatchers()
def onPatch(p):
- patchQuads(_graph, p.delQuads, p.addQuads)
+ """
+ central server has sent us a patch
+ """
+ patchQuads(_graph, p.delQuads, p.addQuads, perfect=True)
log.info("graph now has %s statements" % len(_graph))
try:
self.updateOnPatch(p)
@@ -128,6 +185,8 @@ class SyncedGraph(object):
log.info("listening on %s" % port)
self.register(label)
self.currentFuncs = [] # stack of addHandler callers
+ self._sender = PatchSender('http://localhost:8051/patches',
+ self.updateResource)
def register(self, label):
@@ -144,10 +203,21 @@ class SyncedGraph(object):
log.info("registering with rdfdb")
def patch(self, p):
- """send this patch to the server and apply it to our local graph and run handlers"""
- # currently this has to round-trip. But I could apply the
- # patch here and have the server not bounce it back to me
- return sendPatch('http://localhost:8051/patches', p)
+ """send this patch to the server and apply it to our local
+ graph and run handlers"""
+ patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
+ self.updateOnPatch(p)
+ self._sender.sendPatch(p)
+
+ def patchObject(self, context, subject, predicate, newObject):
+ """send a patch which removes existing values for (s,p,*,c)
+ and adds (s,p,newObject,c). Values in other graphs are not affected"""
+ raise NotImplementedError
+ existing = []
+
+ self.patch(Patch(
+ delQuads=[],
+ addQuads=[(subject, predicate, newObject, context)]))
def addHandler(self, func):
"""
@@ -184,6 +254,23 @@ class SyncedGraph(object):
# todo: forget the old handlers for this func
self.addHandler(func)
+ def currentState(self, context=None):
+ """
+ a graph you can read without being in an addHandler
+ """
+ class Mgr(object):
+ def __enter__(self2):
+ # this should be a readonly view of the existing graph
+ g = Graph()
+ for s in self._graph.triples((None, None, None), context):
+ g.add(s)
+ return g
+
+ def __exit__(self, type, val, tb):
+ return
+
+ return Mgr()
+
def _getCurrentFunc(self):
if not self.currentFuncs:
# this may become a warning later