Mercurial > code > home > repos > rdfdb
diff rdfdb/service.py @ 65:9bc9de580033
big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Ignore-this: 659d5fab58791d5d570555d095e4ad39
(initiated by syncedgraph) and sending patches both ways on that instead
of having two open ports and a lot more interactions to worry about.
author | Drew Perttula <drewp@bigasterisk.com> |
---|---|
date | Fri, 31 May 2019 21:40:00 +0000 |
parents | dbf47ed931d9 |
children | 109fefea80a7 |
line wrap: on
line diff
--- a/rdfdb/service.py Fri May 31 21:39:02 2019 +0000 +++ b/rdfdb/service.py Fri May 31 21:40:00 2019 +0000 @@ -1,9 +1,9 @@ import sys, optparse, logging, json, os, time, itertools -from typing import Callable, Dict, List, Set, Optional, Union +from typing import Dict, List, Optional from greplin.scales.cyclonehandler import StatsHandler from greplin import scales -from twisted.internet import reactor, defer, task +from twisted.internet import reactor, task from twisted.internet.inotify import IN_CREATE, INotify from twisted.python.failure import Failure from twisted.python.filepath import FilePath @@ -14,8 +14,6 @@ from rdfdb.file_vs_uri import correctToTopdirPrefix, fileForUri, uriFromFile, DirUriMap from rdfdb.graphfile import GraphFile, PatchCb, GetSubgraph from rdfdb.patch import Patch, ALLSTMTS -from rdfdb.patchreceiver import makePatchEndpointPutMethod -from rdfdb.patchsender import sendPatch from rdfdb.rdflibpatch import patchQuads # move this out @@ -46,49 +44,6 @@ pass -class Client(object): - """ - one of our syncedgraph clients - """ - - def __init__(self, updateUri: URIRef, label: str): - self.label = label - # todo: updateUri is used publicly to compare clients. Replace - # it with Client.__eq__ so WsClient doesn't have to fake an - # updateUri. - self.updateUri = updateUri - - def __repr__(self): - return "<%s client at %s>" % (self.label, self.updateUri) - - def sendPatch(self, p: Patch) -> defer.Deferred: - """ - returns deferred. error will be interpreted as the client being - broken. - """ - return sendPatch(self.updateUri, p) - - -class WsClient(object): - - def __init__(self, connectionId: str, sendMessage: Callable[[str], None]): - self.updateUri = URIRef(connectionId) - self.sendMessage = sendMessage - - def __repr__(self): - return "<WsClient %s>" % self.updateUri - - def sendPatch(self, p: Patch) -> defer.Deferred: - self.sendMessage(p.makeJsonRepr()) - return defer.succeed(None) - - -def sendGraphToClient(graph, client: Union[Client, WsClient]) -> None: - """send the client the whole graph contents""" - log.info("sending all graphs to %r..." % client) - client.sendPatch(Patch(addQuads=graph.quads(ALLSTMTS), delQuads=[])) - log.info("...sent.") - class WatchedFiles(object): """ @@ -220,13 +175,59 @@ self.graphFiles[ctx].dirty(g) +_wsClientSerial = itertools.count(0) + + +class WebsocketClient(cyclone.websocket.WebSocketHandler): + """ + Send patches to the client (starting with a client who has 0 + statements) to keep it in sync with the graph. + + Accept patches from the client, and assume that the client has + already applied them to its local graph. + + Treat a disconnect as 'out of sync'. Either the client thinks it + is out of sync and wants to start over, or we can't apply a patch + correctly therefore we disconnect to make the client start over. + + This socket may also carry some special messages meant for the + rdfdb web UI, e.g. about who is connected, etc. + """ + connectionId: str + + def connectionMade(self, *args, **kwargs) -> None: + self.connectionId = f'WS{next(_wsClientSerial)}' + + self.sendMessage(json.dumps({'connectedAs': self.connectionId})) + log.info("new ws client %r", self) + self.settings.db.addClient(self) + + def connectionLost(self, reason): + log.info("bye ws client %r: %s", self, reason) + self.settings.db.clientErrored(Failure(WebsocketDisconnect(reason)), + self) + + def messageReceived(self, message: bytes): + if message == b'PING': + self.sendMessage('PONG') + return + log.debug("got message from %r: %s", self, message[:32]) + p = Patch(jsonRepr=message.decode('utf8')) + self.settings.db.patch(p, sender=self.connectionId) + + def sendPatch(self, p: Patch): + self.sendMessage(p.makeJsonRepr()) + + def __repr__(self): + return f"<SyncedGraph client {self.connectionId}>" + class Db(object): """ the master graph, all the connected clients, all the files we're watching """ def __init__(self, dirUriMap: DirUriMap, addlPrefixes): - self.clients: List[Union[Client, WsClient]] = [] + self.clients: List[WebsocketClient] = [] self.graph = ConjunctiveGraph() stats.graphLen = len(self.graph) stats.clients = len(self.clients) @@ -237,50 +238,44 @@ self.summarizeToLog() @graphStats.patchFps.rate() - def patch(self, patch: Patch, dueToFileChange: bool = False) -> None: + def patch(self, patch: Patch, sender: Optional[str]=None, dueToFileChange: bool = False) -> None: """ apply this patch to the master graph then notify everyone about it dueToFileChange if this is a patch describing an edit we read *from* the file (such that we shouldn't write it back to the file) - - if p has a senderUpdateUri attribute, we won't send this patch - back to the sender with that updateUri """ ctx = patch.getContext() log.info("patching graph %s -%d +%d" % (ctx, len(patch.delQuads), len(patch.addQuads))) - if hasattr(self, 'watchedFiles'): # not available during startup - self.watchedFiles.aboutToPatch(ctx) + self.watchedFiles.aboutToPatch(ctx) + # an error here needs to drop the sender, and reset everyone + # else if we can't rollback the failing patch. patchQuads(self.graph, patch.delQuads, patch.addQuads, perfect=True) stats.graphLen = len(self.graph) - self._sendPatch(patch) + + self._syncPatchToOtherClients(patch, sender) if not dueToFileChange: self.watchedFiles.dirtyFiles([ctx]) - sendToLiveClients(asJson=patch.jsonRepr) graphStats.statements = len(self.graph) - def _sendPatch(self, p: Patch): - senderUpdateUri: Optional[URIRef] = getattr(p, 'senderUpdateUri', None) - + def _syncPatchToOtherClients(self, p: Patch, sender: str): for c in self.clients: - if c.updateUri == senderUpdateUri: + if c.connectionId == sender: # this client has self-applied the patch already - log.debug("_sendPatch: don't resend to %r", c) + log.debug("_syncPatchToOtherClients: don't resend to %r", c) continue - log.debug('_sendPatch: send to %r', c) - d = c.sendPatch(p) - d.addErrback(self.clientErrored, c) - + log.debug('_syncPatchToOtherClients: send to %r', c) + c.sendPatch(p) + def clientErrored(self, err, c) -> None: err.trap(twisted.internet.error.ConnectError, WebsocketDisconnect) log.info("%r %r - dropping client", c, err.getErrorMessage()) if c in self.clients: self.clients.remove(c) stats.clients = len(self.clients) - self.sendClientsToAllLivePages() def summarizeToLog(self): log.info("contexts in graph (%s total stmts):" % len(self.graph)) @@ -304,25 +299,13 @@ g.add(s) return g - def addClient(self, newClient: Union[Client, WsClient]) -> None: - for c in self.clients: - if c.updateUri == newClient.updateUri: - self.clients.remove(c) + def addClient(self, newClient: WebsocketClient) -> None: + log.info("new connection: sending all graphs to %r..." % newClient) + newClient.sendPatch(Patch(addQuads=self.graph.quads(ALLSTMTS), delQuads=[])) - log.info("new client %r" % newClient) - sendGraphToClient(self.graph, newClient) self.clients.append(newClient) - self.sendClientsToAllLivePages() stats.clients = len(self.clients) - def sendClientsToAllLivePages(self) -> None: - sendToLiveClients({ - "clients": [ - dict(updateUri=c.updateUri.toPython(), label=repr(c)) - for c in self.clients - ] - }) - class GraphResource(cyclone.web.RequestHandler): @@ -344,32 +327,6 @@ self.write(self.settings.db.graph.serialize(format=format)) -class Patches(cyclone.web.RequestHandler): - - def __init__(self, *args, **kw): - cyclone.web.RequestHandler.__init__(self, *args, **kw) - p = makePatchEndpointPutMethod(self.settings.db.patch) - self.put = lambda: p(self) - - def get(self): - pass - - -class GraphClients(cyclone.web.RequestHandler): - - def get(self): - pass - - def post(self) -> None: - upd = URIRef(self.get_argument("clientUpdate")) - try: - self.settings.db.addClient(Client(upd, self.get_argument("label"))) - except: - import traceback - traceback.print_exc() - raise - - class Prefixes(cyclone.web.RequestHandler): def post(self): @@ -379,66 +336,6 @@ {}).update(suggestion['prefixes']) -_wsClientSerial = itertools.count(0) - - -class WebsocketClient(cyclone.websocket.WebSocketHandler): - - wsClient: Optional[WsClient] = None - - def connectionMade(self, *args, **kwargs) -> None: - connectionId = f'WS{next(_wsClientSerial)}' - - self.wsClient = WsClient(connectionId, self.sendMessage) - self.sendMessage(json.dumps({'connectedAs': connectionId})) - log.info("new ws client %r", self.wsClient) - self.settings.db.addClient(self.wsClient) - - def connectionLost(self, reason): - log.info("bye ws client %r", self.wsClient) - self.settings.db.clientErrored(Failure(WebsocketDisconnect(reason)), - self.wsClient) - - def messageReceived(self, message: bytes): - if message == b'PING': - self.sendMessage('PONG') - return - log.info("got message from %r: %s", self.wsClient, message) - p = Patch(jsonRepr=message.decode('utf8')) - assert self.wsClient is not None - p.senderUpdateUri = self.wsClient.updateUri - self.settings.db.patch(p) - - -class Live(cyclone.websocket.WebSocketHandler): - - def connectionMade(self, *args, **kwargs): - log.info("websocket opened") - liveClients.add(self) - stats.liveClients = len(liveClients) - self.settings.db.sendClientsToAllLivePages() - - def connectionLost(self, reason): - log.info("websocket closed") - liveClients.remove(self) - stats.liveClients = len(liveClients) - - def messageReceived(self, message: bytes): - log.info("got message %s" % message) - # this is just a leftover test? - self.sendMessage(message.decode('utf8')) - - -liveClients: Set[Live] = set() -stats.liveClients = len(liveClients) - -def sendToLiveClients(d: Optional[Dict]=None, asJson: Optional[str]=None): - msg: str = asJson or json.dumps(d) - assert isinstance(msg, str), repr(msg) - for c in liveClients: - c.sendMessage(msg) - - class NoExts(cyclone.web.StaticFileHandler): # .html pages can be get() without .html on them def get(self, path, *args, **kw): @@ -480,10 +377,7 @@ reactor.listenTCP( port, cyclone.web.Application(handlers=[ - (r'/live', Live), (r'/graph', GraphResource), - (r'/patches', Patches), - (r'/graphClients', GraphClients), (r'/syncedGraph', WebsocketClient), (r'/prefixes', Prefixes), (r'/stats/(.*)', StatsHandler, {'serverName': 'rdfdb'}),