changeset 814:1ae8e6b287e3

improvements to file watching. outline of how resync will work Ignore-this: 501c4f2076099364645cc27e9fe48f61
author drewp@bigasterisk.com
date Sun, 30 Sep 2012 07:11:49 +0000
parents 6f984ce851e2
children d7f1f868eb6c
files bin/keyboardcomposer bin/rdfdb light9/rdfdb/clientsession.py light9/rdfdb/graphfile.py light9/rdfdb/syncedgraph.py
diffstat 5 files changed, 181 insertions(+), 41 deletions(-) [+]
line wrap: on
line diff
--- a/bin/keyboardcomposer	Sun Sep 30 07:10:51 2012 +0000
+++ b/bin/keyboardcomposer	Sun Sep 30 07:11:49 2012 +0000
@@ -119,7 +119,7 @@
 
            ?session :subSetting [a :SubSetting; :sub ?s; :level ?l]
         """
-
+        # move to syncedgraph patchMapping
         with self.sub.graph.currentState() as graph:
             adds = set([])
             for setting in graph.objects(self.session, L9['subSetting']):
@@ -127,7 +127,7 @@
                     break
             else:
                 setting = URIRef(self.session + "/setting/%s" %
-                                 random.randrange(999999))
+                                 random.randrange(999999999))
                 adds.update([
                     (self.session, L9['subSetting'], setting, self.session),
                     (setting, RDF.type, L9['SubSetting'], self.session),
@@ -143,7 +143,7 @@
 
     def updateLevelFromGraph(self):
         """read rdf level, write it to subbox.slider_var"""
-        
+        # move this to syncedgraph readMapping
         graph = self.sub.graph
 
         for setting in graph.objects(self.session, L9['subSetting']):
--- a/bin/rdfdb	Sun Sep 30 07:10:51 2012 +0000
+++ b/bin/rdfdb	Sun Sep 30 07:11:49 2012 +0000
@@ -155,27 +155,62 @@
     the master graph, all the connected clients, all the files we're watching
     """
     def __init__(self):
+        # files from cwd become uris starting with this. *should* be
+        # building uris from the show uri in $LIGHT9_SHOW/URI
+        # instead. Who wants to keep their data in the same dir tree
+        # as the source code?!
+        self.topUri = URIRef("http://light9.bigasterisk.com/")
+
         self.clients = []
         self.graph = ConjunctiveGraph()
 
-        notifier = INotify()
-        notifier.startReading()
+        self.notifier = INotify()
+        self.notifier.startReading()
+        self.graphFiles = {} # context uri : GraphFile
+
+        self.findAndLoadFiles()
+
+    def findAndLoadFiles(self):
+        self.initialLoad = True
+        try:
+            dirs = [
+                "show/dance2012/sessions",
+                "show/dance2012/subs",
+                "show/dance2012/subterms",
+                ]
+
+            for topdir in dirs:
+                for dirpath, dirnames, filenames in os.walk(topdir):
+                    for base in filenames:
+                        self.watchFile(os.path.join(dirpath, base))
+                # todo: also notice new files in this dir
 
-        for inFile in [#"show/dance2012/config.n3",
-                       "show/dance2012/patch.n3",
-                       "show/dance2012/subs/bcools",
-                       "show/dance2012/subs/bwarm",
-                       "show/dance2012/subs/house",
-                       "demo.n3",
-                       ]:
-            self.g = GraphFile(notifier,
-                               inFile,
-                               URIRef("http://example.com/file/%s" %
-                                      os.path.basename(inFile)),
-                               self.patch,
-                               self.getSubgraph)
+            self.watchFile("show/dance2012/config.n3")
+            self.watchFile("show/dance2012/patch.n3")
+        finally:
+            self.initialLoad = False
+            
+        self.summarizeToLog()
 
-    def patch(self, p):
+    def uriFromFile(self, filename):
+        if filename.endswith('.n3'):
+            # some legacy files don't end with n3. when we write them
+            # back this might not go so well
+            filename = filename[:-len('.n3')]
+        return URIRef(self.topUri + filename)
+        
+    def fileForUri(self, ctx):
+        if not ctx.startswith(self.topUri):
+            raise ValueError("don't know what filename to use for %s" % ctx)
+        return ctx[len(self.topUri):] + ".n3"
+
+    def watchFile(self, inFile):
+        ctx = self.uriFromFile(inFile)
+        gf = GraphFile(self.notifier, inFile, ctx, self.patch, self.getSubgraph)
+        self.graphFiles[ctx] = gf
+        gf.reread()
+        
+    def patch(self, p, dueToFileChange=False):
         """
         apply this patch to the master graph then notify everyone about it
 
@@ -185,20 +220,43 @@
         if p has a senderUpdateUri attribute, we won't send this patch
         back to the sender with that updateUri
         """
-        log.info("patching graph -%d +%d" % (len(p.delQuads), len(p.addQuads)))
+        ctx = p.getContext()
+        log.info("patching graph %s -%d +%d" % (
+            ctx, len(p.delQuads), len(p.addQuads)))
 
         patchQuads(self.graph, p.delQuads, p.addQuads, perfect=True)
         senderUpdateUri = getattr(p, 'senderUpdateUri', None)
-        self.summarizeToLog()
+        #if not self.initialLoad:
+        #    self.summarizeToLog()
         for c in self.clients:
-            print "send to %s? %s %s" % (c, c.updateUri, senderUpdateUri)
             if c.updateUri == senderUpdateUri:
                 # this client has self-applied the patch already
                 continue
             d = c.sendPatch(p)
             d.addErrback(self.clientErrored, c)
+        if not dueToFileChange:
+            self.dirtyFiles([ctx])
         sendToLiveClients(asJson=p.jsonRepr)
 
+    def dirtyFiles(self, ctxs):
+        """mark dirty the files that we watch in these contexts.
+
+        the ctx might not be a file that we already read; it might be
+        for a new file we have to create, or it might be for a
+        transient context that we're not going to save
+
+        if it's a ctx with no file, error
+        """
+        for ctx in ctxs:
+            g = self.getSubgraph(ctx)
+
+            if ctx not in self.graphFiles:
+                outFile = self.fileForUri(ctx)
+                self.graphFiles[ctx] = GraphFile(self.notifier, outFile, ctx,
+                                                 self.patch, self.getSubgraph)
+            
+            self.graphFiles[ctx].dirty(g)
+
     def clientErrored(self, err, c):
         err.trap(twisted.internet.error.ConnectError)
         log.info("connection error- dropping client %r" % c)
@@ -240,11 +298,6 @@
             dict(updateUri=c.updateUri, label=c.label)
             for c in self.clients]})        
 
-class Index(PrettyErrorHandler, cyclone.web.RequestHandler):
-    def get(self):
-        self.set_header("Content-Type", "application/xhtml+xml")
-        self.write(open("light9/rdfdb.xhtml").read())
-
 class GraphResource(PrettyErrorHandler, cyclone.web.RequestHandler):
     def get(self):
         pass
@@ -258,7 +311,6 @@
     def get(self):
         pass
 
-
 class GraphClients(PrettyErrorHandler, cyclone.web.RequestHandler):
     def get(self):
         pass
@@ -293,6 +345,13 @@
         log.info("got message %s" % message)
         self.sendMessage(message)
 
+class NoExts(cyclone.web.StaticFileHandler):
+    # .xhtml pages can be get() without .xhtml on them
+    def get(self, path, *args, **kw):
+        if path and '.' not in path:
+            path = path + ".xhtml"
+        cyclone.web.StaticFileHandler.get(self, path, *args, **kw)
+
 if __name__ == "__main__":
     logging.basicConfig()
     log = logging.getLogger()
@@ -311,18 +370,21 @@
         raise ValueError("missing --show http://...")
 
     db = Db()
-
+    
+    from twisted.python import log as twlog
+    twlog.startLogging(sys.stdout)
+            
     port = 8051
     reactor.listenTCP(port, cyclone.web.Application(handlers=[
-        (r'/', Index),
         (r'/live', Live),
         (r'/graph', GraphResource),
         (r'/patches', Patches),
         (r'/graphClients', GraphClients),
 
-        (r"/(jquery-1\.7\.2\.min\.js)", cyclone.web.StaticFileHandler,
-         dict(path='lib')),
-        
-        ], db=db))
+        (r'/(.*)', NoExts,
+         {"path" : "light9/rdfdb/web",
+          "default_filename" : "index.xhtml"}),
+
+        ], debug=True, db=db))
     log.info("serving on %s" % port)
     prof.run(reactor.run, profile=False)
--- a/light9/rdfdb/clientsession.py	Sun Sep 30 07:10:51 2012 +0000
+++ b/light9/rdfdb/clientsession.py	Sun Sep 30 07:11:49 2012 +0000
@@ -4,6 +4,7 @@
 """
 from rdflib import URIRef
 from urllib import quote
+from light9 import showconfig
 
 def add_option(parser):
     parser.add_option(
@@ -12,5 +13,5 @@
         default='default')
 
 def getUri(appName, opts):
-    return URIRef("http://example.com/session/%s/%s" %
-                  (appName, quote(opts.session, safe='')))
+    return URIRef("%s/sessions/%s/%s" % (showconfig.showUri(), appName,
+                                         quote(opts.session, safe='')))
--- a/light9/rdfdb/graphfile.py	Sun Sep 30 07:10:51 2012 +0000
+++ b/light9/rdfdb/graphfile.py	Sun Sep 30 07:11:49 2012 +0000
@@ -1,11 +1,13 @@
-import logging, traceback
+import logging, traceback, os, time
 from twisted.python.filepath import FilePath
+from twisted.internet import reactor
 from twisted.internet.inotify import IN_CLOSE_WRITE, IN_MOVED_FROM
 from rdflib import Graph
 from light9.rdfdb.patch import Patch
 from light9.rdfdb.rdflibpatch import inContext
 
-log = logging.getLogger()
+log = logging.getLogger('graphfile')
+iolog = logging.getLogger('io')
 
 class GraphFile(object):
     """
@@ -18,13 +20,28 @@
         self.path, self.uri = path, uri
         self.patch, self.getSubgraph = patch, getSubgraph
 
+        if not os.path.exists(path):
+            # can't start notify until file exists
+            try:
+                os.makedirs(os.path.dirname(path))
+            except OSError:
+                pass
+            f = open(path, "w")
+            f.close()
+            iolog.info("created %s", path)
+
+        self.flushDelay = 2 # seconds until we have to call flush() when dirty
+        self.writeCall = None # or DelayedCall
+        self.lastWriteTimestamp = 0 # mtime from the last time _we_ wrote
         notifier.watch(FilePath(path),
                        mask=IN_CLOSE_WRITE | IN_MOVED_FROM,
                        callbacks=[self.notify])
-        self.reread()
       
     def notify(self, notifier, filepath, mask):
-        log.info("file %s changed" % filepath)
+        if filepath.getModificationTime() == self.lastWriteTimestamp:
+            log.debug("file %s changed, but we did this write", filepath)
+            return
+        log.info("file %s changed", filepath)
         try:
             self.reread()
         except Exception:
--- a/light9/rdfdb/syncedgraph.py	Sun Sep 30 07:10:51 2012 +0000
+++ b/light9/rdfdb/syncedgraph.py	Sun Sep 30 07:11:49 2012 +0000
@@ -144,6 +144,23 @@
 
     def _sendPatchErr(self, e):
         self._currentSendPatchRequest = None
+        # we're probably out of sync with the master now, since
+        # SyncedGraph.patch optimistically applied the patch to our
+        # local graph already. What happens to this patch? What
+        # happens to further pending patches? Some of the further
+        # patches, especially, may be commutable with the bad one and
+        # might still make sense to apply to the master graph.
+
+        # if someday we are folding pending patches together, this
+        # would be the time to UNDO that and attempt the original
+        # separate patches again
+
+        # this should screen for 409 conflict responses and raise a
+        # special exception for that, so SyncedGraph.sendFailed can
+        # screen for only that type
+
+        # this code is going away; we're going to raise an exception that contains all the pending patches
+        log.error("_sendPatchErr")
         log.error(e)
         self._continueSending()
         
@@ -192,6 +209,31 @@
         self._sender = PatchSender('http://localhost:8051/patches',
                                    self.updateResource)
 
+    def resync(self):
+        """
+        get the whole graph again from the server (e.g. we had a
+        conflict while applying a patch and want to return to the
+        truth).
+
+        To avoid too much churn, we remember our old graph and diff it
+        against the replacement. This way, our callers only see the
+        corrections.
+
+        Edits you make during a resync will surely be lost, so I
+        should just fail them. There should be a notification back to
+        UIs who want to show that we're doing a resync.
+        """
+        return cyclone.httpclient.fetch(
+            url="http://localhost:8051/graph",
+            method="GET",
+            headers={'Accept':'x-trig'},
+            ).addCallback(self._resyncGraph)
+
+    def _resyncGraph(self, response):
+        pass
+        #diff against old entire graph
+        #broadcast that change
+
     def register(self, label):
 
         def done(x):
@@ -216,7 +258,17 @@
         
         patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
         self.updateOnPatch(p)
-        self._sender.sendPatch(p)
+        self._sender.sendPatch(p).addErrback(self.sendFailed)
+
+    def sendFailed(self, result):
+        """
+        we asked for a patch to be queued and sent to the master, and
+        that ultimately failed because of a conflict
+        """
+        #i think we should receive back all the pending patches,
+        #do a resysnc here,
+        #then requeue all the pending patches (minus the failing one?) after that's done.
+
 
     def patchObject(self, context, subject, predicate, newObject):
         """send a patch which removes existing values for (s,p,*,c)
@@ -231,6 +283,14 @@
             delQuads=existing,
             addQuads=[(subject, predicate, newObject, context)]))
 
+    def patchMapping(self, context, subject, predicate, keyPred, valuePred, newKey, newValue):
+        """
+        proposed api for updating things like ?session :subSetting [
+        :sub ?s; :level ?v ]. Keyboardcomposer has an implementation
+        already. There should be a complementary readMapping that gets
+        you a value since that's tricky too
+        """
+
     def addHandler(self, func):
         """
         run this (idempotent) func, noting what graph values it