diff --git a/bin/keyboardcomposer b/bin/keyboardcomposer --- a/bin/keyboardcomposer +++ b/bin/keyboardcomposer @@ -119,7 +119,7 @@ class SubmasterBox(Frame): ?session :subSetting [a :SubSetting; :sub ?s; :level ?l] """ - + # move to syncedgraph patchMapping with self.sub.graph.currentState() as graph: adds = set([]) for setting in graph.objects(self.session, L9['subSetting']): @@ -127,7 +127,7 @@ class SubmasterBox(Frame): break else: setting = URIRef(self.session + "/setting/%s" % - random.randrange(999999)) + random.randrange(999999999)) adds.update([ (self.session, L9['subSetting'], setting, self.session), (setting, RDF.type, L9['SubSetting'], self.session), @@ -143,7 +143,7 @@ class SubmasterBox(Frame): def updateLevelFromGraph(self): """read rdf level, write it to subbox.slider_var""" - + # move this to syncedgraph readMapping graph = self.sub.graph for setting in graph.objects(self.session, L9['subSetting']): diff --git a/bin/rdfdb b/bin/rdfdb --- a/bin/rdfdb +++ b/bin/rdfdb @@ -155,27 +155,62 @@ class Db(object): the master graph, all the connected clients, all the files we're watching """ def __init__(self): + # files from cwd become uris starting with this. *should* be + # building uris from the show uri in $LIGHT9_SHOW/URI + # instead. Who wants to keep their data in the same dir tree + # as the source code?! + self.topUri = URIRef("http://light9.bigasterisk.com/") + self.clients = [] self.graph = ConjunctiveGraph() - notifier = INotify() - notifier.startReading() + self.notifier = INotify() + self.notifier.startReading() + self.graphFiles = {} # context uri : GraphFile + + self.findAndLoadFiles() + + def findAndLoadFiles(self): + self.initialLoad = True + try: + dirs = [ + "show/dance2012/sessions", + "show/dance2012/subs", + "show/dance2012/subterms", + ] + + for topdir in dirs: + for dirpath, dirnames, filenames in os.walk(topdir): + for base in filenames: + self.watchFile(os.path.join(dirpath, base)) + # todo: also notice new files in this dir - for inFile in [#"show/dance2012/config.n3", - "show/dance2012/patch.n3", - "show/dance2012/subs/bcools", - "show/dance2012/subs/bwarm", - "show/dance2012/subs/house", - "demo.n3", - ]: - self.g = GraphFile(notifier, - inFile, - URIRef("http://example.com/file/%s" % - os.path.basename(inFile)), - self.patch, - self.getSubgraph) + self.watchFile("show/dance2012/config.n3") + self.watchFile("show/dance2012/patch.n3") + finally: + self.initialLoad = False + + self.summarizeToLog() - def patch(self, p): + def uriFromFile(self, filename): + if filename.endswith('.n3'): + # some legacy files don't end with n3. when we write them + # back this might not go so well + filename = filename[:-len('.n3')] + return URIRef(self.topUri + filename) + + def fileForUri(self, ctx): + if not ctx.startswith(self.topUri): + raise ValueError("don't know what filename to use for %s" % ctx) + return ctx[len(self.topUri):] + ".n3" + + def watchFile(self, inFile): + ctx = self.uriFromFile(inFile) + gf = GraphFile(self.notifier, inFile, ctx, self.patch, self.getSubgraph) + self.graphFiles[ctx] = gf + gf.reread() + + def patch(self, p, dueToFileChange=False): """ apply this patch to the master graph then notify everyone about it @@ -185,20 +220,43 @@ class Db(object): if p has a senderUpdateUri attribute, we won't send this patch back to the sender with that updateUri """ - log.info("patching graph -%d +%d" % (len(p.delQuads), len(p.addQuads))) + ctx = p.getContext() + log.info("patching graph %s -%d +%d" % ( + ctx, len(p.delQuads), len(p.addQuads))) patchQuads(self.graph, p.delQuads, p.addQuads, perfect=True) senderUpdateUri = getattr(p, 'senderUpdateUri', None) - self.summarizeToLog() + #if not self.initialLoad: + # self.summarizeToLog() for c in self.clients: - print "send to %s? %s %s" % (c, c.updateUri, senderUpdateUri) if c.updateUri == senderUpdateUri: # this client has self-applied the patch already continue d = c.sendPatch(p) d.addErrback(self.clientErrored, c) + if not dueToFileChange: + self.dirtyFiles([ctx]) sendToLiveClients(asJson=p.jsonRepr) + def dirtyFiles(self, ctxs): + """mark dirty the files that we watch in these contexts. + + the ctx might not be a file that we already read; it might be + for a new file we have to create, or it might be for a + transient context that we're not going to save + + if it's a ctx with no file, error + """ + for ctx in ctxs: + g = self.getSubgraph(ctx) + + if ctx not in self.graphFiles: + outFile = self.fileForUri(ctx) + self.graphFiles[ctx] = GraphFile(self.notifier, outFile, ctx, + self.patch, self.getSubgraph) + + self.graphFiles[ctx].dirty(g) + def clientErrored(self, err, c): err.trap(twisted.internet.error.ConnectError) log.info("connection error- dropping client %r" % c) @@ -240,11 +298,6 @@ class Db(object): dict(updateUri=c.updateUri, label=c.label) for c in self.clients]}) -class Index(PrettyErrorHandler, cyclone.web.RequestHandler): - def get(self): - self.set_header("Content-Type", "application/xhtml+xml") - self.write(open("light9/rdfdb.xhtml").read()) - class GraphResource(PrettyErrorHandler, cyclone.web.RequestHandler): def get(self): pass @@ -258,7 +311,6 @@ class Patches(PrettyErrorHandler, cyclon def get(self): pass - class GraphClients(PrettyErrorHandler, cyclone.web.RequestHandler): def get(self): pass @@ -293,6 +345,13 @@ class Live(cyclone.websocket.WebSocketHa log.info("got message %s" % message) self.sendMessage(message) +class NoExts(cyclone.web.StaticFileHandler): + # .xhtml pages can be get() without .xhtml on them + def get(self, path, *args, **kw): + if path and '.' not in path: + path = path + ".xhtml" + cyclone.web.StaticFileHandler.get(self, path, *args, **kw) + if __name__ == "__main__": logging.basicConfig() log = logging.getLogger() @@ -311,18 +370,21 @@ if __name__ == "__main__": raise ValueError("missing --show http://...") db = Db() - + + from twisted.python import log as twlog + twlog.startLogging(sys.stdout) + port = 8051 reactor.listenTCP(port, cyclone.web.Application(handlers=[ - (r'/', Index), (r'/live', Live), (r'/graph', GraphResource), (r'/patches', Patches), (r'/graphClients', GraphClients), - (r"/(jquery-1\.7\.2\.min\.js)", cyclone.web.StaticFileHandler, - dict(path='lib')), - - ], db=db)) + (r'/(.*)', NoExts, + {"path" : "light9/rdfdb/web", + "default_filename" : "index.xhtml"}), + + ], debug=True, db=db)) log.info("serving on %s" % port) prof.run(reactor.run, profile=False) diff --git a/light9/rdfdb/clientsession.py b/light9/rdfdb/clientsession.py --- a/light9/rdfdb/clientsession.py +++ b/light9/rdfdb/clientsession.py @@ -4,6 +4,7 @@ multiple instances of that client separa """ from rdflib import URIRef from urllib import quote +from light9 import showconfig def add_option(parser): parser.add_option( @@ -12,5 +13,5 @@ def add_option(parser): default='default') def getUri(appName, opts): - return URIRef("http://example.com/session/%s/%s" % - (appName, quote(opts.session, safe=''))) + return URIRef("%s/sessions/%s/%s" % (showconfig.showUri(), appName, + quote(opts.session, safe=''))) diff --git a/light9/rdfdb/graphfile.py b/light9/rdfdb/graphfile.py --- a/light9/rdfdb/graphfile.py +++ b/light9/rdfdb/graphfile.py @@ -1,11 +1,13 @@ -import logging, traceback +import logging, traceback, os, time from twisted.python.filepath import FilePath +from twisted.internet import reactor from twisted.internet.inotify import IN_CLOSE_WRITE, IN_MOVED_FROM from rdflib import Graph from light9.rdfdb.patch import Patch from light9.rdfdb.rdflibpatch import inContext -log = logging.getLogger() +log = logging.getLogger('graphfile') +iolog = logging.getLogger('io') class GraphFile(object): """ @@ -18,13 +20,28 @@ class GraphFile(object): self.path, self.uri = path, uri self.patch, self.getSubgraph = patch, getSubgraph + if not os.path.exists(path): + # can't start notify until file exists + try: + os.makedirs(os.path.dirname(path)) + except OSError: + pass + f = open(path, "w") + f.close() + iolog.info("created %s", path) + + self.flushDelay = 2 # seconds until we have to call flush() when dirty + self.writeCall = None # or DelayedCall + self.lastWriteTimestamp = 0 # mtime from the last time _we_ wrote notifier.watch(FilePath(path), mask=IN_CLOSE_WRITE | IN_MOVED_FROM, callbacks=[self.notify]) - self.reread() def notify(self, notifier, filepath, mask): - log.info("file %s changed" % filepath) + if filepath.getModificationTime() == self.lastWriteTimestamp: + log.debug("file %s changed, but we did this write", filepath) + return + log.info("file %s changed", filepath) try: self.reread() except Exception: diff --git a/light9/rdfdb/syncedgraph.py b/light9/rdfdb/syncedgraph.py --- a/light9/rdfdb/syncedgraph.py +++ b/light9/rdfdb/syncedgraph.py @@ -144,6 +144,23 @@ class PatchSender(object): def _sendPatchErr(self, e): self._currentSendPatchRequest = None + # we're probably out of sync with the master now, since + # SyncedGraph.patch optimistically applied the patch to our + # local graph already. What happens to this patch? What + # happens to further pending patches? Some of the further + # patches, especially, may be commutable with the bad one and + # might still make sense to apply to the master graph. + + # if someday we are folding pending patches together, this + # would be the time to UNDO that and attempt the original + # separate patches again + + # this should screen for 409 conflict responses and raise a + # special exception for that, so SyncedGraph.sendFailed can + # screen for only that type + + # this code is going away; we're going to raise an exception that contains all the pending patches + log.error("_sendPatchErr") log.error(e) self._continueSending() @@ -192,6 +209,31 @@ class SyncedGraph(object): self._sender = PatchSender('http://localhost:8051/patches', self.updateResource) + def resync(self): + """ + get the whole graph again from the server (e.g. we had a + conflict while applying a patch and want to return to the + truth). + + To avoid too much churn, we remember our old graph and diff it + against the replacement. This way, our callers only see the + corrections. + + Edits you make during a resync will surely be lost, so I + should just fail them. There should be a notification back to + UIs who want to show that we're doing a resync. + """ + return cyclone.httpclient.fetch( + url="http://localhost:8051/graph", + method="GET", + headers={'Accept':'x-trig'}, + ).addCallback(self._resyncGraph) + + def _resyncGraph(self, response): + pass + #diff against old entire graph + #broadcast that change + def register(self, label): def done(x): @@ -216,7 +258,17 @@ class SyncedGraph(object): patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True) self.updateOnPatch(p) - self._sender.sendPatch(p) + self._sender.sendPatch(p).addErrback(self.sendFailed) + + def sendFailed(self, result): + """ + we asked for a patch to be queued and sent to the master, and + that ultimately failed because of a conflict + """ + #i think we should receive back all the pending patches, + #do a resysnc here, + #then requeue all the pending patches (minus the failing one?) after that's done. + def patchObject(self, context, subject, predicate, newObject): """send a patch which removes existing values for (s,p,*,c) @@ -231,6 +283,14 @@ class SyncedGraph(object): delQuads=existing, addQuads=[(subject, predicate, newObject, context)])) + def patchMapping(self, context, subject, predicate, keyPred, valuePred, newKey, newValue): + """ + proposed api for updating things like ?session :subSetting [ + :sub ?s; :level ?v ]. Keyboardcomposer has an implementation + already. There should be a complementary readMapping that gets + you a value since that's tricky too + """ + def addHandler(self, func): """ run this (idempotent) func, noting what graph values it