diff --git a/bin/rdfdb b/bin/rdfdb --- a/bin/rdfdb +++ b/bin/rdfdb @@ -107,9 +107,10 @@ Our web ui: to this, focused on that resource """ -from twisted.internet import reactor +from twisted.internet import reactor, defer import twisted.internet.error from twisted.python.filepath import FilePath +from twisted.python.failure import Failure from twisted.internet.inotify import humanReadableMask, IN_CREATE import sys, optparse, logging, json, os import cyclone.web, cyclone.httpclient, cyclone.websocket @@ -129,10 +130,12 @@ log.setLevel(logging.DEBUG) from lib.cycloneerr import PrettyErrorHandler +class WebsocketDisconnect(ValueError): + pass + def sendGraphToClient(graph, client): """send the client the whole graph contents""" - log.info("sending all graphs to %s at %s" % - (client.label, client.updateUri)) + log.info("sending all graphs to %r" % client) client.sendPatch(Patch( addQuads=graph.quads(ALLSTMTS), delQuads=[])) @@ -144,14 +147,33 @@ class Client(object): """ def __init__(self, updateUri, label): self.label = label + # todo: updateUri is used publicly to compare clients. Replace + # it with Client.__eq__ so WsClient doesn't have to fake an + # updateUri. self.updateUri = updateUri def __repr__(self): return "<%s client at %s>" % (self.label, self.updateUri) def sendPatch(self, p): + """ + returns deferred. error will be interpreted as the client being + broken. + """ return sendPatch(self.updateUri, p) + +class WsClient(object): + def __init__(self, connectionId, sendMessage): + self.updateUri = connectionId + self.sendMessage = sendMessage + def __repr__(self): + return "" % self.updateUri + + def sendPatch(self, p): + self.sendMessage(p.makeJsonRepr()) + return defer.succeed(None) + class WatchedFiles(object): """ find files, notice new files. @@ -304,8 +326,8 @@ class Db(object): d.addErrback(self.clientErrored, c) def clientErrored(self, err, c): - err.trap(twisted.internet.error.ConnectError) - log.info("connection error- dropping client %r" % c) + err.trap(twisted.internet.error.ConnectError, WebsocketDisconnect) + log.info("%r %r - dropping client", c, err.getErrorMessage()) self.clients.remove(c) self.sendClientsToAllLivePages() @@ -335,14 +357,14 @@ class Db(object): [self.clients.remove(c) for c in self.clients if c.updateUri == newClient.updateUri] - log.info("new client %s at %s" % (newClient.label, newClient.updateUri)) + log.info("new client %r" % newClient) sendGraphToClient(self.graph, newClient) self.clients.append(newClient) self.sendClientsToAllLivePages() def sendClientsToAllLivePages(self): sendToLiveClients({"clients":[ - dict(updateUri=c.updateUri, label=c.label) + dict(updateUri=c.updateUri, label=repr(c)) for c in self.clients]}) class GraphResource(PrettyErrorHandler, cyclone.web.RequestHandler): @@ -376,6 +398,28 @@ class GraphClients(PrettyErrorHandler, c import traceback traceback.print_exc() raise + +_wsClientSerial = 0 +class WebsocketClient(cyclone.websocket.WebSocketHandler): + + def connectionMade(self, *args, **kwargs): + global _wsClientSerial + self.connectionId = 'connection-%s' % _wsClientSerial + log.info("new ws client %r", self.connectionId) + _wsClientSerial += 1 + + self.wsClient = WsClient(self.connectionId, self.sendMessage) + self.settings.db.addClient(self.wsClient) + + def connectionLost(self, reason): + log.info("bye ws client %r", self.connectionId) + self.settings.db.clientErrored( + Failure(WebsocketDisconnect(reason)), self.wsClient) + + def messageReceived(self, message): + log.info("got message from %s: %s", self.connectionId, message) + # how + self.sendMessage(message) liveClients = set() def sendToLiveClients(d=None, asJson=None): @@ -427,6 +471,7 @@ if __name__ == "__main__": (r'/graph', GraphResource), (r'/patches', Patches), (r'/graphClients', GraphClients), + (r'/syncedGraph', WebsocketClient), (r'/(.*)', NoExts, {"path" : "light9/rdfdb/web",