Changeset - 54027815c6cc
[Not reviewed]
default
0 1 0
Drew Perttula - 11 years ago 2014-05-27 05:48:25
drewp@bigasterisk.com
rdfdb can return 'application/n-quads' response
Ignore-this: 3ab9cdd77f1c088af031c714b3ca2804
1 file changed with 2 insertions and 0 deletions:
0 comments (0 inline, 0 general)
bin/rdfdb
Show inline comments
 
@@ -190,274 +190,276 @@ class WatchedFiles(object):
 
                    for base in filenames:
 
                        self.watchFile(os.path.join(dirpath, base))
 
                    self.notifier.watch(FilePath(dirpath), autoAdd=True,
 
                                        callbacks=[self.dirChange])
 
        finally:
 
            self.initialLoad = False
 

	
 
    def dirChange(self, watch, path, mask):
 
        if mask & IN_CREATE:
 
            log.debug("%s created; consider adding a watch", path)
 
            self.watchFile(path.path)
 
            
 
    def watchFile(self, inFile):
 
        """
 
        consider adding a GraphFile to self.graphFiles
 

	
 
        inFile needs to be a relative path, not an absolute (e.g. in a
 
        FilePath) because we use its exact relative form in the
 
        context URI
 
        """
 
        if not os.path.isfile(inFile):
 
            return
 
        if not any(inFile.startswith(prefix) for prefix in self.topDirsToWatch):
 
            for prefix in self.topDirsToWatch:
 
                prefixAbs = os.path.abspath(prefix)
 
                if inFile.startswith(prefixAbs):
 
                    inFile = prefix + inFile[len(prefixAbs):]
 
                    break
 
            else:
 
                raise ValueError("can't correct %s to start with one of %s" %
 
                                 (inFile, self.topDirsToWatch))
 
        if os.path.splitext(inFile)[1] not in ['.n3']:
 
            return
 

	
 
        # an n3 file with rules makes it all the way past this reading
 
        # and the serialization. Then, on the receiving side, a
 
        # SyncedGraph calls graphFromNQuad on the incoming data and
 
        # has a parse error. I'm not sure where this should be fixed
 
        # yet.
 
        if '-rules' in inFile:
 
            return
 

	
 
        # for legacy versions, compile all the config stuff you want
 
        # read into one file called config.n3. New versions won't read
 
        # it.
 
        if inFile.endswith("config.n3"):
 
            return
 
            
 
        ctx = self.uriFromFile(inFile)
 
        gf = GraphFile(self.notifier, inFile, ctx,
 
                       self.patch, self.getSubgraph)
 
        self.graphFiles[ctx] = gf
 
        log.info("%s do initial read", inFile)
 
        gf.reread()
 

	
 
    def aboutToPatch(self, ctx):
 
        """
 
        warn us that a patch is about to come to this context. it's more
 
        straightforward to create the new file now
 

	
 
        this is meant to make the file before we add triples, so we
 
        wouldn't see the blank file and lose those triples. But it
 
        didn't work, so there are other measures that make us not lose
 
        the triples from a new file. Calling this before patching the
 
        graph is still a reasonable thing to do, though.
 
        """
 
        g = self.getSubgraph(ctx)
 

	
 
        if ctx not in self.graphFiles:
 
            outFile = self.fileForUri(ctx)
 
            log.info("starting new file %r", outFile)
 
            self.graphFiles[ctx] = GraphFile(self.notifier, outFile, ctx,
 
                                             self.patch, self.getSubgraph)
 

	
 
    def dirtyFiles(self, ctxs):
 
        """mark dirty the files that we watch in these contexts.
 

	
 
        the ctx might not be a file that we already read; it might be
 
        for a new file we have to create, or it might be for a
 
        transient context that we're not going to save
 

	
 
        if it's a ctx with no file, error
 
        """
 
        for ctx in ctxs:
 
            g = self.getSubgraph(ctx)
 
            self.graphFiles[ctx].dirty(g)
 

	
 
    def uriFromFile(self, filename):
 
        assert filename.endswith('.n3'), filename
 
        if not any(filename.startswith(t) for t in self.topDirsToWatch):
 
            raise ValueError("filename %s doesn't start with any of %s" %
 
                             (filename, self.topDirsToWatch))
 
        return URIRef(self.topUri + filename[:-len('.n3')])
 

	
 
    def fileForUri(self, ctx):
 
        assert isinstance(ctx, URIRef), ctx
 
        if not ctx.startswith(self.topUri):
 
            raise ValueError("don't know what filename to use for %s" % ctx)
 
        return ctx[len(self.topUri):] + ".n3"
 

	
 
        
 
class Db(object):
 
    """
 
    the master graph, all the connected clients, all the files we're watching
 
    """
 
    def __init__(self, topDirsToWatch):
 
      
 
        self.clients = []
 
        self.graph = ConjunctiveGraph()
 

	
 
        self.watchedFiles = WatchedFiles(topDirsToWatch,
 
                                         self.patch, self.getSubgraph)
 
        
 
        self.summarizeToLog()
 

	
 
    def patch(self, p, dueToFileChange=False):
 
        """
 
        apply this patch to the master graph then notify everyone about it
 

	
 
        dueToFileChange if this is a patch describing an edit we read
 
        *from* the file (such that we shouldn't write it back to the file)
 

	
 
        if p has a senderUpdateUri attribute, we won't send this patch
 
        back to the sender with that updateUri
 
        """
 
        ctx = p.getContext()
 
        log.info("patching graph %s -%d +%d" % (
 
            ctx, len(p.delQuads), len(p.addQuads)))
 

	
 
        if hasattr(self, 'watchedFiles'): # not available during startup
 
            self.watchedFiles.aboutToPatch(ctx)
 
        
 
        patchQuads(self.graph, p.delQuads, p.addQuads, perfect=True)
 
        senderUpdateUri = getattr(p, 'senderUpdateUri', None)
 

	
 
        for c in self.clients:
 
            if c.updateUri == senderUpdateUri:
 
                # this client has self-applied the patch already
 
                continue
 
            d = c.sendPatch(p)
 
            d.addErrback(self.clientErrored, c)
 
        if not dueToFileChange:
 
            self.watchedFiles.dirtyFiles([ctx])
 
        sendToLiveClients(asJson=p.jsonRepr)
 

	
 
    def clientErrored(self, err, c):
 
        err.trap(twisted.internet.error.ConnectError)
 
        log.info("connection error- dropping client %r" % c)
 
        self.clients.remove(c)
 
        self.sendClientsToAllLivePages()
 

	
 
    def summarizeToLog(self):
 
        log.info("contexts in graph (%s total stmts):" % len(self.graph))
 
        for c in self.graph.contexts():
 
            log.info("  %s: %s statements" %
 
                     (c.identifier, len(self.getSubgraph(c.identifier))))
 

	
 
    def getSubgraph(self, uri):
 
        """
 
        this is meant to return a live view of the given subgraph, but
 
        if i'm still working around an rdflib bug, it might return a
 
        copy
 

	
 
        and it's returning triples, but I think quads would be better
 
        """
 
        # this is returning an empty Graph :(
 
        #return self.graph.get_context(uri)
 

	
 
        g = Graph()
 
        for s in self.graph.triples(ALLSTMTS, uri):
 
            g.add(s)
 
        return g
 

	
 
    def addClient(self, updateUri, label):
 
        [self.clients.remove(c)
 
         for c in self.clients if c.updateUri == updateUri]
 

	
 
        log.info("new client %s at %s" % (label, updateUri))
 
        self.clients.append(Client(updateUri, label, self))
 
        self.sendClientsToAllLivePages()
 

	
 
    def sendClientsToAllLivePages(self):
 
        sendToLiveClients({"clients":[
 
            dict(updateUri=c.updateUri, label=c.label)
 
            for c in self.clients]})
 

	
 
class GraphResource(PrettyErrorHandler, cyclone.web.RequestHandler):
 
    def get(self):
 
        accept = self.request.headers.get('accept', '')
 
        format = 'n3'
 
        if accept == 'text/plain':
 
            format = 'nt'
 
        elif accept == 'application/n-quads':
 
            format = 'nquads'
 
        self.write(self.settings.db.graph.serialize(format=format))
 

	
 
class Patches(PrettyErrorHandler, cyclone.web.RequestHandler):
 
    def __init__(self, *args, **kw):
 
        cyclone.web.RequestHandler.__init__(self, *args, **kw)
 
        p = makePatchEndpointPutMethod(self.settings.db.patch)
 
        self.put = lambda: p(self)
 

	
 
    def get(self):
 
        pass
 

	
 
class GraphClients(PrettyErrorHandler, cyclone.web.RequestHandler):
 
    def get(self):
 
        pass
 

	
 
    def post(self):
 
        upd = self.get_argument("clientUpdate")
 
        try:
 
            self.settings.db.addClient(upd, self.get_argument("label"))
 
        except:
 
            import traceback
 
            traceback.print_exc()
 
            raise
 

	
 
liveClients = set()
 
def sendToLiveClients(d=None, asJson=None):
 
    j = asJson or json.dumps(d)
 
    for c in liveClients:
 
        c.sendMessage(j)
 

	
 
class Live(cyclone.websocket.WebSocketHandler):
 

	
 
    def connectionMade(self, *args, **kwargs):
 
        log.info("websocket opened")
 
        liveClients.add(self)
 
        self.settings.db.sendClientsToAllLivePages()
 

	
 
    def connectionLost(self, reason):
 
        log.info("websocket closed")
 
        liveClients.remove(self)
 

	
 
    def messageReceived(self, message):
 
        log.info("got message %s" % message)
 
        self.sendMessage(message)
 

	
 
class NoExts(cyclone.web.StaticFileHandler):
 
    # .xhtml pages can be get() without .xhtml on them
 
    def get(self, path, *args, **kw):
 
        if path and '.' not in path:
 
            path = path + ".xhtml"
 
        cyclone.web.StaticFileHandler.get(self, path, *args, **kw)
 

	
 
if __name__ == "__main__":
 
    logging.basicConfig()
 
    log = logging.getLogger()
 

	
 
    parser = optparse.OptionParser()
 
    parser.add_option("-v", "--verbose", action="store_true",
 
                      help="logging.DEBUG")
 
    (options, args) = parser.parse_args()
 

	
 
    log.setLevel(logging.DEBUG if options.verbose else logging.INFO)
 

	
 
    db = Db(topDirsToWatch=[os.environ['LIGHT9_SHOW']])
 

	
 
    from twisted.python import log as twlog
 
    twlog.startLogging(sys.stdout)
 

	
 
    port = 8051
 
    reactor.listenTCP(port, cyclone.web.Application(handlers=[
 
        (r'/live', Live),
 
        (r'/graph', GraphResource),
 
        (r'/patches', Patches),
 
        (r'/graphClients', GraphClients),
 

	
 
        (r'/(.*)', NoExts,
 
         {"path" : "light9/rdfdb/web",
 
          "default_filename" : "index.xhtml"}),
 

	
 
        ], debug=True, db=db))
 
    log.info("serving on %s" % port)
 
    prof.run(reactor.run, profile=False)
0 comments (0 inline, 0 general)