Changeset - 2fc0e726a3c3
[Not reviewed]
default
0 1 0
drewp@bigasterisk.com - 8 years ago 2017-06-10 02:06:30
drewp@bigasterisk.com
rdfdb alternate graph formats for timing tests
Ignore-this: 4aea5721454500c088927b10ca17aae
1 file changed with 8 insertions and 0 deletions:
0 comments (0 inline, 0 general)
bin/rdfdb
Show inline comments
 
@@ -293,192 +293,200 @@ class Db(object):
 
    """
 
    def __init__(self, dirUriMap, addlPrefixes):
 
      
 
        self.clients = []
 
        self.graph = ConjunctiveGraph()
 

	
 
        self.watchedFiles = WatchedFiles(dirUriMap,
 
                                         self.patch, self.getSubgraph,
 
                                         addlPrefixes)
 
        
 
        self.summarizeToLog()
 

	
 
    def patch(self, p, dueToFileChange=False):
 
        """
 
        apply this patch to the master graph then notify everyone about it
 

	
 
        dueToFileChange if this is a patch describing an edit we read
 
        *from* the file (such that we shouldn't write it back to the file)
 

	
 
        if p has a senderUpdateUri attribute, we won't send this patch
 
        back to the sender with that updateUri
 
        """
 
        ctx = p.getContext()
 
        log.info("patching graph %s -%d +%d" % (
 
            ctx, len(p.delQuads), len(p.addQuads)))
 

	
 
        if hasattr(self, 'watchedFiles'): # not available during startup
 
            self.watchedFiles.aboutToPatch(ctx)
 
        
 
        patchQuads(self.graph, p.delQuads, p.addQuads, perfect=True)
 
        self._sendPatch(p)
 
        if not dueToFileChange:
 
            self.watchedFiles.dirtyFiles([ctx])
 
        sendToLiveClients(asJson=p.jsonRepr)
 

	
 
    def _sendPatch(self, p):
 
        senderUpdateUri = getattr(p, 'senderUpdateUri', None)
 

	
 
        for c in self.clients:
 
            if c.updateUri == senderUpdateUri:
 
                # this client has self-applied the patch already
 
                continue
 
            d = c.sendPatch(p)
 
            d.addErrback(self.clientErrored, c)
 
        
 
    def clientErrored(self, err, c):
 
        err.trap(twisted.internet.error.ConnectError, WebsocketDisconnect)
 
        log.info("%r %r - dropping client", c, err.getErrorMessage())
 
        if c in self.clients:
 
            self.clients.remove(c)
 
        self.sendClientsToAllLivePages()
 

	
 
    def summarizeToLog(self):
 
        log.info("contexts in graph (%s total stmts):" % len(self.graph))
 
        for c in self.graph.contexts():
 
            log.info("  %s: %s statements" %
 
                     (c.identifier, len(self.getSubgraph(c.identifier))))
 

	
 
    def getSubgraph(self, uri):
 
        """
 
        this is meant to return a live view of the given subgraph, but
 
        if i'm still working around an rdflib bug, it might return a
 
        copy
 

	
 
        and it's returning triples, but I think quads would be better
 
        """
 
        # this is returning an empty Graph :(
 
        #return self.graph.get_context(uri)
 

	
 
        g = Graph()
 
        for s in self.graph.triples(ALLSTMTS, uri):
 
            g.add(s)
 
        return g
 

	
 
    def addClient(self, newClient):
 
        [self.clients.remove(c)
 
         for c in self.clients if c.updateUri == newClient.updateUri]
 

	
 
        log.info("new client %r" % newClient)
 
        sendGraphToClient(self.graph, newClient)
 
        self.clients.append(newClient)
 
        self.sendClientsToAllLivePages()
 

	
 
    def sendClientsToAllLivePages(self):
 
        sendToLiveClients({"clients":[
 
            dict(updateUri=c.updateUri, label=repr(c))
 
            for c in self.clients]})
 

	
 
class GraphResource(PrettyErrorHandler, cyclone.web.RequestHandler):
 
    def get(self):
 
        accept = self.request.headers.get('accept', '')
 
        format = 'n3'
 
        if accept == 'text/plain':
 
            format = 'nt'
 
        elif accept == 'application/n-quads':
 
            format = 'nquads'
 
        elif accept == 'pickle':
 
            # don't use this; it's just for speed comparison
 
            import cPickle as pickle
 
            pickle.dump(self.settings.db.graph, self, protocol=2)
 
            return
 
        elif accept == 'msgpack':
 
            self.write(repr(self.settings.db.graph.__getstate__))
 
            return
 
        self.write(self.settings.db.graph.serialize(format=format))
 

	
 
class Patches(PrettyErrorHandler, cyclone.web.RequestHandler):
 
    def __init__(self, *args, **kw):
 
        cyclone.web.RequestHandler.__init__(self, *args, **kw)
 
        p = makePatchEndpointPutMethod(self.settings.db.patch)
 
        self.put = lambda: p(self)
 

	
 
    def get(self):
 
        pass
 

	
 
class GraphClients(PrettyErrorHandler, cyclone.web.RequestHandler):
 
    def get(self):
 
        pass
 

	
 
    def post(self):
 
        upd = self.get_argument("clientUpdate")
 
        try:
 
            self.settings.db.addClient(Client(upd, self.get_argument("label")))
 
        except:
 
            import traceback
 
            traceback.print_exc()
 
            raise
 
            
 
class Prefixes(PrettyErrorHandler, cyclone.web.RequestHandler):
 
    def post(self):
 
        suggestion = json.loads(self.request.body)
 
        addlPrefixes = self.settings.db.watchedFiles.addlPrefixes
 
        addlPrefixes.setdefault(URIRef(suggestion['ctx']), {}).update(suggestion['prefixes'])
 
    
 
_wsClientSerial = 0
 
class WebsocketClient(cyclone.websocket.WebSocketHandler):
 

	
 
    def connectionMade(self, *args, **kwargs):
 
        global _wsClientSerial
 
        connectionId = 'connection-%s' % _wsClientSerial
 
        _wsClientSerial += 1
 

	
 
        self.wsClient = WsClient(connectionId, self.sendMessage)
 
        log.info("new ws client %r", self.wsClient)
 
        self.settings.db.addClient(self.wsClient)
 

	
 
    def connectionLost(self, reason):
 
        log.info("bye ws client %r", self.wsClient)
 
        self.settings.db.clientErrored(
 
            Failure(WebsocketDisconnect(reason)), self.wsClient)
 

	
 
    def messageReceived(self, message):
 
        if message == 'PING':
 
            self.sendMessage('PONG')
 
            return
 
        log.info("got message from %r: %s", self.wsClient, message)
 
        p = Patch(jsonRepr=message)
 
        p.senderUpdateUri = self.wsClient.updateUri
 
        self.settings.db.patch(p)
 

	
 
liveClients = set()
 
def sendToLiveClients(d=None, asJson=None):
 
    j = asJson or json.dumps(d)
 
    for c in liveClients:
 
        c.sendMessage(j)
 

	
 
class Live(cyclone.websocket.WebSocketHandler):
 

	
 
    def connectionMade(self, *args, **kwargs):
 
        log.info("websocket opened")
 
        liveClients.add(self)
 
        self.settings.db.sendClientsToAllLivePages()
 

	
 
    def connectionLost(self, reason):
 
        log.info("websocket closed")
 
        liveClients.remove(self)
 

	
 
    def messageReceived(self, message):
 
        log.info("got message %s" % message)
 
        self.sendMessage(message)
 

	
 
class NoExts(cyclone.web.StaticFileHandler):
 
    # .html pages can be get() without .html on them
 
    def get(self, path, *args, **kw):
 
        if path and '.' not in path:
 
            path = path + ".html"
 
        cyclone.web.StaticFileHandler.get(self, path, *args, **kw)
 

	
 

	
 
        
 
if __name__ == "__main__":
 
    logging.basicConfig()
 
    log = logging.getLogger()
 

	
 
    parser = optparse.OptionParser()
 
    parser.add_option("-v", "--verbose", action="store_true",
 
                      help="logging.DEBUG")
 
    (options, args) = parser.parse_args()
 

	
 
    log.setLevel(logging.DEBUG if options.verbose else logging.INFO)
0 comments (0 inline, 0 general)