Changeset - e53e78db7b17
[Not reviewed]
default
0 2 0
Drew Perttula - 12 years ago 2013-06-04 22:46:34
drewp@bigasterisk.com
refactor file watching. notice new files and dirs.
Ignore-this: 7f571fd9b309b54cff22b2fee5e68322
2 files changed with 82 insertions and 56 deletions:
0 comments (0 inline, 0 general)
bin/rdfdb
Show inline comments
 
@@ -106,12 +106,14 @@ Our web ui:
 
    something. clicking any resource from the other displays takes you
 
    to this, focused on that resource
 

	
 
"""
 
from twisted.internet import reactor
 
import twisted.internet.error
 
from twisted.python.filepath import FilePath
 
from twisted.internet.inotify import humanReadableMask, IN_CREATE
 
import sys, optparse, logging, json, os
 
import cyclone.web, cyclone.httpclient, cyclone.websocket
 
sys.path.append(".")
 
from light9 import networking, showconfig, prof
 
from rdflib import ConjunctiveGraph, URIRef, Graph
 
from light9.rdfdb.graphfile import GraphFile
 
@@ -152,72 +154,109 @@ class Client(object):
 
            addQuads=self.db.graph.quads(ALLSTMTS),
 
            delQuads=[]))
 

	
 
    def sendPatch(self, p):
 
        return syncedgraph.sendPatch(self.updateUri, p)
 

	
 
class Db(object):
 
class WatchedFiles(object):
 
    """
 
    the master graph, all the connected clients, all the files we're watching
 
    find files, notice new files.
 

	
 
    This object watches directories. Each GraphFile watches its own file.
 
    """
 
    def __init__(self):
 
    def __init__(self, topDirsToWatch, patch, getSubgraph):
 
        self.topDirsToWatch = topDirsToWatch
 
        self.patch, self.getSubgraph = patch, getSubgraph
 
        
 
        # files from cwd become uris starting with this. *should* be
 
        # building uris from the show uri in $LIGHT9_SHOW/URI
 
        # instead. Who wants to keep their data in the same dir tree
 
        # as the source code?!
 
        self.topUri = URIRef("http://light9.bigasterisk.com/")
 

	
 
        self.clients = []
 
        self.graph = ConjunctiveGraph()
 

	
 
        self.graphFiles = {} # context uri : GraphFile
 
        
 
        self.notifier = INotify()
 
        self.notifier.startReading()
 
        self.graphFiles = {} # context uri : GraphFile
 

	
 
        
 
        self.findAndLoadFiles()
 

	
 
    def findAndLoadFiles(self):
 
        self.initialLoad = True
 
        try:
 
            dirs = [
 
                "show/dance2012/sessions",
 
                "show/dance2012/subs",
 
                "show/dance2012/subterms",
 
                ]
 

	
 
            for topdir in dirs:
 
            for topdir in self.topDirsToWatch:
 
                for dirpath, dirnames, filenames in os.walk(topdir):
 
                    for base in filenames:
 
                        self.watchFile(os.path.join(dirpath, base))
 
                # todo: also notice new files in this dir
 

	
 
            self.watchFile("show/dance2012/config.n3")
 
            self.watchFile("show/dance2012/patch.n3")
 
                    self.notifier.watch(FilePath(dirpath), autoAdd=True,
 
                                        callbacks=[self.dirChange])
 
        finally:
 
            self.initialLoad = False
 

	
 
        self.summarizeToLog()
 
    def dirChange(self, watch, path, mask):
 
        if mask & IN_CREATE:
 
            self.watchFile(path.path)
 
            
 
    def watchFile(self, inFile):
 
        if not isinstance(inFile, FilePath):
 
            inFile = FilePath(inFile)
 
        if not inFile.isfile():
 
            return
 
        if inFile.splitext()[1] not in ['.n3']:
 
            return
 
        ctx = self.uriFromFile(inFile)
 
        gf = GraphFile(self.notifier, inFile.path, ctx,
 
                       self.patch, self.getSubgraph)
 
        self.graphFiles[ctx] = gf
 
        gf.reread()
 

	
 
    def dirtyFiles(self, ctxs):
 
        """mark dirty the files that we watch in these contexts.
 

	
 
        the ctx might not be a file that we already read; it might be
 
        for a new file we have to create, or it might be for a
 
        transient context that we're not going to save
 

	
 
        if it's a ctx with no file, error
 
        """
 
        for ctx in ctxs:
 
            g = self.getSubgraph(ctx)
 

	
 
            if ctx not in self.graphFiles:
 
                outFile = self.fileForUri(ctx)
 
                self.graphFiles[ctx] = GraphFile(self.notifier, outFile, ctx,
 
                                                 self.patch, self.getSubgraph)
 
            
 
            self.graphFiles[ctx].dirty(g)
 

	
 
    def uriFromFile(self, filename):
 
        if filename.endswith('.n3'):
 
            # some legacy files don't end with n3. when we write them
 
            # back this might not go so well
 
            filename = filename[:-len('.n3')]
 
        return URIRef(self.topUri + filename)
 
        if isinstance(filename, FilePath):
 
            filename = filename.path
 
        assert filename.endswith('.n3'), filename
 
        return URIRef(self.topUri + filename[:-len('.n3')])
 

	
 
    def fileForUri(self, ctx):
 
        assert isinstance(ctx, URIRef), ctx
 
        if not ctx.startswith(self.topUri):
 
            raise ValueError("don't know what filename to use for %s" % ctx)
 
        return ctx[len(self.topUri):] + ".n3"
 

	
 
    def watchFile(self, inFile):
 
        ctx = self.uriFromFile(inFile)
 
        gf = GraphFile(self.notifier, inFile, ctx, self.patch, self.getSubgraph)
 
        self.graphFiles[ctx] = gf
 
        gf.reread()
 
        
 
class Db(object):
 
    """
 
    the master graph, all the connected clients, all the files we're watching
 
    """
 
    def __init__(self, topDirsToWatch):
 
      
 
        self.clients = []
 
        self.graph = ConjunctiveGraph()
 

	
 
        self.watchedFiles = WatchedFiles(topDirsToWatch,
 
                                         self.patch, self.getSubgraph)
 
        
 
        self.summarizeToLog()
 

	
 
    def patch(self, p, dueToFileChange=False):
 
        """
 
        apply this patch to the master graph then notify everyone about it
 

	
 
        dueToFileChange if this is a patch describing an edit we read
 
@@ -229,43 +268,23 @@ class Db(object):
 
        ctx = p.getContext()
 
        log.info("patching graph %s -%d +%d" % (
 
            ctx, len(p.delQuads), len(p.addQuads)))
 

	
 
        patchQuads(self.graph, p.delQuads, p.addQuads, perfect=True)
 
        senderUpdateUri = getattr(p, 'senderUpdateUri', None)
 
        #if not self.initialLoad:
 
        #    self.summarizeToLog()
 

	
 
        for c in self.clients:
 
            if c.updateUri == senderUpdateUri:
 
                # this client has self-applied the patch already
 
                continue
 
            d = c.sendPatch(p)
 
            d.addErrback(self.clientErrored, c)
 
        if not dueToFileChange:
 
            self.dirtyFiles([ctx])
 
            self.watchedFiles.dirtyFiles([ctx])
 
        sendToLiveClients(asJson=p.jsonRepr)
 

	
 
    def dirtyFiles(self, ctxs):
 
        """mark dirty the files that we watch in these contexts.
 

	
 
        the ctx might not be a file that we already read; it might be
 
        for a new file we have to create, or it might be for a
 
        transient context that we're not going to save
 

	
 
        if it's a ctx with no file, error
 
        """
 
        for ctx in ctxs:
 
            g = self.getSubgraph(ctx)
 

	
 
            if ctx not in self.graphFiles:
 
                outFile = self.fileForUri(ctx)
 
                self.graphFiles[ctx] = GraphFile(self.notifier, outFile, ctx,
 
                                                 self.patch, self.getSubgraph)
 

	
 
            self.graphFiles[ctx].dirty(g)
 

	
 
    def clientErrored(self, err, c):
 
        err.trap(twisted.internet.error.ConnectError)
 
        log.info("connection error- dropping client %r" % c)
 
        self.clients.remove(c)
 
        self.sendClientsToAllLivePages()
 

	
 
@@ -372,13 +391,13 @@ if __name__ == "__main__":
 

	
 
    log.setLevel(logging.DEBUG if options.verbose else logging.INFO)
 

	
 
    if not options.show:
 
        raise ValueError("missing --show http://...")
 

	
 
    db = Db()
 
    db = Db(topDirsToWatch=['show/dance2013'])
 

	
 
    from twisted.python import log as twlog
 
    twlog.startLogging(sys.stdout)
 

	
 
    port = 8051
 
    reactor.listenTCP(port, cyclone.web.Application(handlers=[
light9/rdfdb/graphfile.py
Show inline comments
 
@@ -35,15 +35,20 @@ class GraphFile(object):
 
        self.lastWriteTimestamp = 0 # mtime from the last time _we_ wrote
 
        notifier.watch(FilePath(path),
 
                       mask=IN_CLOSE_WRITE | IN_MOVED_FROM,
 
                       callbacks=[self.notify])
 
      
 
    def notify(self, notifier, filepath, mask):
 
        if filepath.getModificationTime() == self.lastWriteTimestamp:
 
            log.debug("file %s changed, but we did this write", filepath)
 
        try:
 
            if filepath.getModificationTime() == self.lastWriteTimestamp:
 
                log.debug("file %s changed, but we did this write", filepath)
 
                return
 
        except OSError as e:
 
            log.error("watched file %s: %r" % (filepath, e))
 
            return
 
            
 
        log.info("file %s changed", filepath)
 
        try:
 
            self.reread()
 
        except Exception:
 
            traceback.print_exc()
 

	
 
@@ -54,16 +59,18 @@ class GraphFile(object):
 
        try:
 
            new.parse(location=self.path, format='n3')
 
        except SyntaxError as e:
 
            print e
 
            log.error("syntax error in %s" % self.path)
 
            return
 
        except IOError as e:
 
            log.error("rereading %s: %r" % (self.uri, e))
 
            return
 

	
 
        old = inContext(old, self.uri)
 
        new = inContext(new, self.uri)
 
        print "old %s new %s" % (old, new)
 

	
 
        p = Patch.fromDiff(old, new)
 
        if p:
 
            self.patch(p, dueToFileChange=True)
 

	
 
    def dirty(self, graph):
0 comments (0 inline, 0 general)