Changeset - 1ae8e6b287e3
[Not reviewed]
default
0 5 0
drewp@bigasterisk.com - 12 years ago 2012-09-30 07:11:49
drewp@bigasterisk.com
improvements to file watching. outline of how resync will work
Ignore-this: 501c4f2076099364645cc27e9fe48f61
5 files changed with 181 insertions and 41 deletions:
0 comments (0 inline, 0 general)
bin/keyboardcomposer
Show inline comments
 
@@ -119,7 +119,7 @@ class SubmasterBox(Frame):
 

	
 
           ?session :subSetting [a :SubSetting; :sub ?s; :level ?l]
 
        """
 

	
 
        # move to syncedgraph patchMapping
 
        with self.sub.graph.currentState() as graph:
 
            adds = set([])
 
            for setting in graph.objects(self.session, L9['subSetting']):
 
@@ -127,7 +127,7 @@ class SubmasterBox(Frame):
 
                    break
 
            else:
 
                setting = URIRef(self.session + "/setting/%s" %
 
                                 random.randrange(999999))
 
                                 random.randrange(999999999))
 
                adds.update([
 
                    (self.session, L9['subSetting'], setting, self.session),
 
                    (setting, RDF.type, L9['SubSetting'], self.session),
 
@@ -143,7 +143,7 @@ class SubmasterBox(Frame):
 

	
 
    def updateLevelFromGraph(self):
 
        """read rdf level, write it to subbox.slider_var"""
 
        
 
        # move this to syncedgraph readMapping
 
        graph = self.sub.graph
 

	
 
        for setting in graph.objects(self.session, L9['subSetting']):
bin/rdfdb
Show inline comments
 
@@ -155,27 +155,62 @@ class Db(object):
 
    the master graph, all the connected clients, all the files we're watching
 
    """
 
    def __init__(self):
 
        # files from cwd become uris starting with this. *should* be
 
        # building uris from the show uri in $LIGHT9_SHOW/URI
 
        # instead. Who wants to keep their data in the same dir tree
 
        # as the source code?!
 
        self.topUri = URIRef("http://light9.bigasterisk.com/")
 

	
 
        self.clients = []
 
        self.graph = ConjunctiveGraph()
 

	
 
        notifier = INotify()
 
        notifier.startReading()
 
        self.notifier = INotify()
 
        self.notifier.startReading()
 
        self.graphFiles = {} # context uri : GraphFile
 

	
 
        self.findAndLoadFiles()
 

	
 
    def findAndLoadFiles(self):
 
        self.initialLoad = True
 
        try:
 
            dirs = [
 
                "show/dance2012/sessions",
 
                "show/dance2012/subs",
 
                "show/dance2012/subterms",
 
                ]
 

	
 
            for topdir in dirs:
 
                for dirpath, dirnames, filenames in os.walk(topdir):
 
                    for base in filenames:
 
                        self.watchFile(os.path.join(dirpath, base))
 
                # todo: also notice new files in this dir
 

	
 
        for inFile in [#"show/dance2012/config.n3",
 
                       "show/dance2012/patch.n3",
 
                       "show/dance2012/subs/bcools",
 
                       "show/dance2012/subs/bwarm",
 
                       "show/dance2012/subs/house",
 
                       "demo.n3",
 
                       ]:
 
            self.g = GraphFile(notifier,
 
                               inFile,
 
                               URIRef("http://example.com/file/%s" %
 
                                      os.path.basename(inFile)),
 
                               self.patch,
 
                               self.getSubgraph)
 
            self.watchFile("show/dance2012/config.n3")
 
            self.watchFile("show/dance2012/patch.n3")
 
        finally:
 
            self.initialLoad = False
 
            
 
        self.summarizeToLog()
 

	
 
    def patch(self, p):
 
    def uriFromFile(self, filename):
 
        if filename.endswith('.n3'):
 
            # some legacy files don't end with n3. when we write them
 
            # back this might not go so well
 
            filename = filename[:-len('.n3')]
 
        return URIRef(self.topUri + filename)
 
        
 
    def fileForUri(self, ctx):
 
        if not ctx.startswith(self.topUri):
 
            raise ValueError("don't know what filename to use for %s" % ctx)
 
        return ctx[len(self.topUri):] + ".n3"
 

	
 
    def watchFile(self, inFile):
 
        ctx = self.uriFromFile(inFile)
 
        gf = GraphFile(self.notifier, inFile, ctx, self.patch, self.getSubgraph)
 
        self.graphFiles[ctx] = gf
 
        gf.reread()
 
        
 
    def patch(self, p, dueToFileChange=False):
 
        """
 
        apply this patch to the master graph then notify everyone about it
 

	
 
@@ -185,20 +220,43 @@ class Db(object):
 
        if p has a senderUpdateUri attribute, we won't send this patch
 
        back to the sender with that updateUri
 
        """
 
        log.info("patching graph -%d +%d" % (len(p.delQuads), len(p.addQuads)))
 
        ctx = p.getContext()
 
        log.info("patching graph %s -%d +%d" % (
 
            ctx, len(p.delQuads), len(p.addQuads)))
 

	
 
        patchQuads(self.graph, p.delQuads, p.addQuads, perfect=True)
 
        senderUpdateUri = getattr(p, 'senderUpdateUri', None)
 
        self.summarizeToLog()
 
        #if not self.initialLoad:
 
        #    self.summarizeToLog()
 
        for c in self.clients:
 
            print "send to %s? %s %s" % (c, c.updateUri, senderUpdateUri)
 
            if c.updateUri == senderUpdateUri:
 
                # this client has self-applied the patch already
 
                continue
 
            d = c.sendPatch(p)
 
            d.addErrback(self.clientErrored, c)
 
        if not dueToFileChange:
 
            self.dirtyFiles([ctx])
 
        sendToLiveClients(asJson=p.jsonRepr)
 

	
 
    def dirtyFiles(self, ctxs):
 
        """mark dirty the files that we watch in these contexts.
 

	
 
        the ctx might not be a file that we already read; it might be
 
        for a new file we have to create, or it might be for a
 
        transient context that we're not going to save
 

	
 
        if it's a ctx with no file, error
 
        """
 
        for ctx in ctxs:
 
            g = self.getSubgraph(ctx)
 

	
 
            if ctx not in self.graphFiles:
 
                outFile = self.fileForUri(ctx)
 
                self.graphFiles[ctx] = GraphFile(self.notifier, outFile, ctx,
 
                                                 self.patch, self.getSubgraph)
 
            
 
            self.graphFiles[ctx].dirty(g)
 

	
 
    def clientErrored(self, err, c):
 
        err.trap(twisted.internet.error.ConnectError)
 
        log.info("connection error- dropping client %r" % c)
 
@@ -240,11 +298,6 @@ class Db(object):
 
            dict(updateUri=c.updateUri, label=c.label)
 
            for c in self.clients]})        
 

	
 
class Index(PrettyErrorHandler, cyclone.web.RequestHandler):
 
    def get(self):
 
        self.set_header("Content-Type", "application/xhtml+xml")
 
        self.write(open("light9/rdfdb.xhtml").read())
 

	
 
class GraphResource(PrettyErrorHandler, cyclone.web.RequestHandler):
 
    def get(self):
 
        pass
 
@@ -258,7 +311,6 @@ class Patches(PrettyErrorHandler, cyclon
 
    def get(self):
 
        pass
 

	
 

	
 
class GraphClients(PrettyErrorHandler, cyclone.web.RequestHandler):
 
    def get(self):
 
        pass
 
@@ -293,6 +345,13 @@ class Live(cyclone.websocket.WebSocketHa
 
        log.info("got message %s" % message)
 
        self.sendMessage(message)
 

	
 
class NoExts(cyclone.web.StaticFileHandler):
 
    # .xhtml pages can be get() without .xhtml on them
 
    def get(self, path, *args, **kw):
 
        if path and '.' not in path:
 
            path = path + ".xhtml"
 
        cyclone.web.StaticFileHandler.get(self, path, *args, **kw)
 

	
 
if __name__ == "__main__":
 
    logging.basicConfig()
 
    log = logging.getLogger()
 
@@ -311,18 +370,21 @@ if __name__ == "__main__":
 
        raise ValueError("missing --show http://...")
 

	
 
    db = Db()
 

	
 
    
 
    from twisted.python import log as twlog
 
    twlog.startLogging(sys.stdout)
 
            
 
    port = 8051
 
    reactor.listenTCP(port, cyclone.web.Application(handlers=[
 
        (r'/', Index),
 
        (r'/live', Live),
 
        (r'/graph', GraphResource),
 
        (r'/patches', Patches),
 
        (r'/graphClients', GraphClients),
 

	
 
        (r"/(jquery-1\.7\.2\.min\.js)", cyclone.web.StaticFileHandler,
 
         dict(path='lib')),
 
        
 
        ], db=db))
 
        (r'/(.*)', NoExts,
 
         {"path" : "light9/rdfdb/web",
 
          "default_filename" : "index.xhtml"}),
 

	
 
        ], debug=True, db=db))
 
    log.info("serving on %s" % port)
 
    prof.run(reactor.run, profile=False)
light9/rdfdb/clientsession.py
Show inline comments
 
@@ -4,6 +4,7 @@ multiple instances of that client separa
 
"""
 
from rdflib import URIRef
 
from urllib import quote
 
from light9 import showconfig
 

	
 
def add_option(parser):
 
    parser.add_option(
 
@@ -12,5 +13,5 @@ def add_option(parser):
 
        default='default')
 

	
 
def getUri(appName, opts):
 
    return URIRef("http://example.com/session/%s/%s" %
 
                  (appName, quote(opts.session, safe='')))
 
    return URIRef("%s/sessions/%s/%s" % (showconfig.showUri(), appName,
 
                                         quote(opts.session, safe='')))
light9/rdfdb/graphfile.py
Show inline comments
 
import logging, traceback
 
import logging, traceback, os, time
 
from twisted.python.filepath import FilePath
 
from twisted.internet import reactor
 
from twisted.internet.inotify import IN_CLOSE_WRITE, IN_MOVED_FROM
 
from rdflib import Graph
 
from light9.rdfdb.patch import Patch
 
from light9.rdfdb.rdflibpatch import inContext
 

	
 
log = logging.getLogger()
 
log = logging.getLogger('graphfile')
 
iolog = logging.getLogger('io')
 

	
 
class GraphFile(object):
 
    """
 
@@ -18,13 +20,28 @@ class GraphFile(object):
 
        self.path, self.uri = path, uri
 
        self.patch, self.getSubgraph = patch, getSubgraph
 

	
 
        if not os.path.exists(path):
 
            # can't start notify until file exists
 
            try:
 
                os.makedirs(os.path.dirname(path))
 
            except OSError:
 
                pass
 
            f = open(path, "w")
 
            f.close()
 
            iolog.info("created %s", path)
 

	
 
        self.flushDelay = 2 # seconds until we have to call flush() when dirty
 
        self.writeCall = None # or DelayedCall
 
        self.lastWriteTimestamp = 0 # mtime from the last time _we_ wrote
 
        notifier.watch(FilePath(path),
 
                       mask=IN_CLOSE_WRITE | IN_MOVED_FROM,
 
                       callbacks=[self.notify])
 
        self.reread()
 
      
 
    def notify(self, notifier, filepath, mask):
 
        log.info("file %s changed" % filepath)
 
        if filepath.getModificationTime() == self.lastWriteTimestamp:
 
            log.debug("file %s changed, but we did this write", filepath)
 
            return
 
        log.info("file %s changed", filepath)
 
        try:
 
            self.reread()
 
        except Exception:
light9/rdfdb/syncedgraph.py
Show inline comments
 
@@ -144,6 +144,23 @@ class PatchSender(object):
 

	
 
    def _sendPatchErr(self, e):
 
        self._currentSendPatchRequest = None
 
        # we're probably out of sync with the master now, since
 
        # SyncedGraph.patch optimistically applied the patch to our
 
        # local graph already. What happens to this patch? What
 
        # happens to further pending patches? Some of the further
 
        # patches, especially, may be commutable with the bad one and
 
        # might still make sense to apply to the master graph.
 

	
 
        # if someday we are folding pending patches together, this
 
        # would be the time to UNDO that and attempt the original
 
        # separate patches again
 

	
 
        # this should screen for 409 conflict responses and raise a
 
        # special exception for that, so SyncedGraph.sendFailed can
 
        # screen for only that type
 

	
 
        # this code is going away; we're going to raise an exception that contains all the pending patches
 
        log.error("_sendPatchErr")
 
        log.error(e)
 
        self._continueSending()
 
        
 
@@ -192,6 +209,31 @@ class SyncedGraph(object):
 
        self._sender = PatchSender('http://localhost:8051/patches',
 
                                   self.updateResource)
 

	
 
    def resync(self):
 
        """
 
        get the whole graph again from the server (e.g. we had a
 
        conflict while applying a patch and want to return to the
 
        truth).
 

	
 
        To avoid too much churn, we remember our old graph and diff it
 
        against the replacement. This way, our callers only see the
 
        corrections.
 

	
 
        Edits you make during a resync will surely be lost, so I
 
        should just fail them. There should be a notification back to
 
        UIs who want to show that we're doing a resync.
 
        """
 
        return cyclone.httpclient.fetch(
 
            url="http://localhost:8051/graph",
 
            method="GET",
 
            headers={'Accept':'x-trig'},
 
            ).addCallback(self._resyncGraph)
 

	
 
    def _resyncGraph(self, response):
 
        pass
 
        #diff against old entire graph
 
        #broadcast that change
 

	
 
    def register(self, label):
 

	
 
        def done(x):
 
@@ -216,7 +258,17 @@ class SyncedGraph(object):
 
        
 
        patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
 
        self.updateOnPatch(p)
 
        self._sender.sendPatch(p)
 
        self._sender.sendPatch(p).addErrback(self.sendFailed)
 

	
 
    def sendFailed(self, result):
 
        """
 
        we asked for a patch to be queued and sent to the master, and
 
        that ultimately failed because of a conflict
 
        """
 
        #i think we should receive back all the pending patches,
 
        #do a resysnc here,
 
        #then requeue all the pending patches (minus the failing one?) after that's done.
 

	
 

	
 
    def patchObject(self, context, subject, predicate, newObject):
 
        """send a patch which removes existing values for (s,p,*,c)
 
@@ -231,6 +283,14 @@ class SyncedGraph(object):
 
            delQuads=existing,
 
            addQuads=[(subject, predicate, newObject, context)]))
 

	
 
    def patchMapping(self, context, subject, predicate, keyPred, valuePred, newKey, newValue):
 
        """
 
        proposed api for updating things like ?session :subSetting [
 
        :sub ?s; :level ?v ]. Keyboardcomposer has an implementation
 
        already. There should be a complementary readMapping that gets
 
        you a value since that's tricky too
 
        """
 

	
 
    def addHandler(self, func):
 
        """
 
        run this (idempotent) func, noting what graph values it
0 comments (0 inline, 0 general)