Mercurial > code > home > repos > rdfdb
changeset 65:9bc9de580033
big rewrite of rdfdb and syncedgraph to use a single websocket connecton
Ignore-this: 659d5fab58791d5d570555d095e4ad39
(initiated by syncedgraph) and sending patches both ways on that instead
of having two open ports and a lot more interactions to worry about.
author | Drew Perttula <drewp@bigasterisk.com> |
---|---|
date | Fri, 31 May 2019 21:40:00 +0000 |
parents | c1a9403e5d21 |
children | 109fefea80a7 |
files | rdfdb/patchreceiver.py rdfdb/patchsender.py rdfdb/service.py rdfdb/syncedgraph.py |
diffstat | 4 files changed, 131 insertions(+), 444 deletions(-) [+] |
line wrap: on
line diff
--- a/rdfdb/patchreceiver.py Fri May 31 21:39:02 2019 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,81 +0,0 @@ -import logging, traceback, urllib.request, urllib.parse, urllib.error - -from twisted.internet import reactor -import cyclone.web -import treq - -from rdfdb.patch import Patch - -log = logging.getLogger('syncedgraph') - - -class PatchReceiver(object): - """ - runs a web server in this process and registers it with the rdfdb - master. See onPatch for what happens when the rdfdb master sends - us a patch - """ - - def __init__(self, rdfdbRoot, host, label, onPatch): - """ - label is what we'll call ourselves to the rdfdb server - - onPatch is what we call back when the server sends a patch - """ - self.rdfdbRoot = rdfdbRoot - listen = reactor.listenTCP( - 0, - cyclone.web.Application(handlers=[ - (r'/update', makePatchEndpoint(onPatch)), - ])) - port = listen._realPortNumber # what's the right call for this? - - self.updateResource = 'http://%s:%s/update' % (host, port) - log.info("listening on %s" % port) - self._register(label) - - def _register(self, label): - url = (self.rdfdbRoot + 'graphClients').encode('utf8') - body = urllib.parse.urlencode([(b'clientUpdate', self.updateResource), - (b'label', label)]).encode('utf8') - treq.post(url, data=body, headers={ - b'Content-Type': [b'application/x-www-form-urlencoded'] - }).addCallbacks( - self._done, - lambda err: self._registerError(err, url, body) - ) - log.info("registering with rdfdb at %s", url) - - def _registerError(self, err, url, body): - log.error('registering to url=%r body=%r', url, body) - log.error(err) - - def _done(self, x): - log.debug("registered with rdfdb") - - -def makePatchEndpointPutMethod(cb): - - def put(self) -> None: - assert isinstance(self, cyclone.web.RequestHandler) - try: - p = Patch(jsonRepr=self.request.body.decode('utf8')) - log.debug("received patch -%d +%d" % - (len(p.delGraph), len(p.addGraph))) - cb(p) - except Exception as e: - self.set_status(500) - # a string that will look good in rdfdb's log - self.write(repr(e)) - traceback.print_exc() - raise - - return put - - -def makePatchEndpoint(cb): - - class Update(cyclone.web.RequestHandler): - put = makePatchEndpointPutMethod(cb) - - return Update
--- a/rdfdb/patchsender.py Fri May 31 21:39:02 2019 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,141 +0,0 @@ -import logging, time -from typing import List, Tuple, Optional - -from rdflib import URIRef -from twisted.internet import defer -import treq - -from rdfdb.patch import Patch - -log = logging.getLogger('patchsender') - -SendResult = defer.Deferred # to None - - -class PatchSender(object): - """ - SyncedGraph may generate patches faster than we can send - them. This object buffers and may even collapse patches before - they go the server - """ - - def __init__(self, target: URIRef, myUpdateResource): - """ - target is the URI we'll send patches to - - myUpdateResource is the URI for this sender of patches, which - maybe needs to be the actual requestable update URI for - sending updates back to us - """ - self.target = target - self.myUpdateResource = myUpdateResource - self._patchesToSend: List[Tuple[Patch, SendResult]] = [] - self._currentSendPatchRequest: Optional[SendResult] = None - - def sendPatch(self, p: Patch) -> SendResult: - sendResult: SendResult = defer.Deferred() - self._patchesToSend.append((p, sendResult)) - self._continueSending() - return sendResult - - def cancelAll(self): - self._patchesToSend[:] = [] - # we might be in the middle of a post; ideally that would be - # aborted, too. Since that's not coded yet, or it might be too late to - # abort, what should happen? - # 1. this could return deferred until we think our posts have stopped - # 2. or, other code could deal for the fact that cancelAll - # isn't perfect - - def _continueSending(self) -> None: - if not self._patchesToSend or self._currentSendPatchRequest: - return - if len(self._patchesToSend) > 1: - log.debug("%s patches left to send", len(self._patchesToSend)) - # this is where we could concatenate little patches into a - # bigger one. Often, many statements will cancel each - # other out. not working yet: - if 0: - p = self._patchesToSend[0].concat(self._patchesToSend[1:]) - print("concat down to") - print('dels') - for q in p.delQuads: - print(q) - print('adds') - for q in p.addQuads: - print(q) - print("----") - else: - p, sendResult = self._patchesToSend.pop(0) - else: - p, sendResult = self._patchesToSend.pop(0) - - self._currentSendPatchRequest = sendPatch( - self.target, p, senderUpdateUri=self.myUpdateResource) - self._currentSendPatchRequest.addCallbacks(self._sendPatchDone, - self._sendPatchErr) - self._currentSendPatchRequest.chainDeferred(sendResult) - - def _sendPatchDone(self, result): - self._currentSendPatchRequest = None - self._continueSending() - - def _sendPatchErr(self, e): - self._currentSendPatchRequest = None - # we're probably out of sync with the master now, since - # SyncedGraph.patch optimistically applied the patch to our - # local graph already. What happens to this patch? What - # happens to further pending patches? Some of the further - # patches, especially, may be commutable with the bad one and - # might still make sense to apply to the master graph. - - # if someday we are folding pending patches together, this - # would be the time to UNDO that and attempt the original - # separate patches again - - # this should screen for 409 conflict responses and raise a - # special exception for that, so SyncedGraph.sendFailed can - # screen for only that type - - # this code is going away; we're going to raise an exception that contains all the pending patches - log.error("_sendPatchErr") - log.error(e) - self._continueSending() - - -def sendPatch(putUri: URIRef, patch: Patch, **kw) -> defer.Deferred: - """ - PUT a patch as json to an http server. Returns deferred. - - kwargs will become extra attributes in the toplevel json object - """ - t1 = time.time() - body = patch.makeJsonRepr(kw) - jsonTime = time.time() - t1 - intro = body[:200] - if len(body) > 200: - intro = intro + "..." - log.debug("send body (rendered %.1fkB in %.1fms): %s", - len(body) / 1024, jsonTime * 1000, intro) - sendTime = time.time() - - def putDone(done): - if not str(done.code).startswith('2'): - def fail(content): - log.warn(f"Sent patch to {putUri}:") - log.warn(str(patch)) - log.warn(f"Receiver failed {done.code} {content}") - raise ValueError("sendPatch failed") - return done.content().addCallback(fail) - - dt = 1000 * (time.time() - sendTime) - log.debug("sendPatch to %s took %.1fms" % (putUri, dt)) - return done - - return treq.put(putUri.toPython(), - data=body.encode('utf8'), - headers={ - b'Content-Type': [b'application/json'] - }, - timeout=2, - ).addCallback(putDone)
--- a/rdfdb/service.py Fri May 31 21:39:02 2019 +0000 +++ b/rdfdb/service.py Fri May 31 21:40:00 2019 +0000 @@ -1,9 +1,9 @@ import sys, optparse, logging, json, os, time, itertools -from typing import Callable, Dict, List, Set, Optional, Union +from typing import Dict, List, Optional from greplin.scales.cyclonehandler import StatsHandler from greplin import scales -from twisted.internet import reactor, defer, task +from twisted.internet import reactor, task from twisted.internet.inotify import IN_CREATE, INotify from twisted.python.failure import Failure from twisted.python.filepath import FilePath @@ -14,8 +14,6 @@ from rdfdb.file_vs_uri import correctToTopdirPrefix, fileForUri, uriFromFile, DirUriMap from rdfdb.graphfile import GraphFile, PatchCb, GetSubgraph from rdfdb.patch import Patch, ALLSTMTS -from rdfdb.patchreceiver import makePatchEndpointPutMethod -from rdfdb.patchsender import sendPatch from rdfdb.rdflibpatch import patchQuads # move this out @@ -46,49 +44,6 @@ pass -class Client(object): - """ - one of our syncedgraph clients - """ - - def __init__(self, updateUri: URIRef, label: str): - self.label = label - # todo: updateUri is used publicly to compare clients. Replace - # it with Client.__eq__ so WsClient doesn't have to fake an - # updateUri. - self.updateUri = updateUri - - def __repr__(self): - return "<%s client at %s>" % (self.label, self.updateUri) - - def sendPatch(self, p: Patch) -> defer.Deferred: - """ - returns deferred. error will be interpreted as the client being - broken. - """ - return sendPatch(self.updateUri, p) - - -class WsClient(object): - - def __init__(self, connectionId: str, sendMessage: Callable[[str], None]): - self.updateUri = URIRef(connectionId) - self.sendMessage = sendMessage - - def __repr__(self): - return "<WsClient %s>" % self.updateUri - - def sendPatch(self, p: Patch) -> defer.Deferred: - self.sendMessage(p.makeJsonRepr()) - return defer.succeed(None) - - -def sendGraphToClient(graph, client: Union[Client, WsClient]) -> None: - """send the client the whole graph contents""" - log.info("sending all graphs to %r..." % client) - client.sendPatch(Patch(addQuads=graph.quads(ALLSTMTS), delQuads=[])) - log.info("...sent.") - class WatchedFiles(object): """ @@ -220,13 +175,59 @@ self.graphFiles[ctx].dirty(g) +_wsClientSerial = itertools.count(0) + + +class WebsocketClient(cyclone.websocket.WebSocketHandler): + """ + Send patches to the client (starting with a client who has 0 + statements) to keep it in sync with the graph. + + Accept patches from the client, and assume that the client has + already applied them to its local graph. + + Treat a disconnect as 'out of sync'. Either the client thinks it + is out of sync and wants to start over, or we can't apply a patch + correctly therefore we disconnect to make the client start over. + + This socket may also carry some special messages meant for the + rdfdb web UI, e.g. about who is connected, etc. + """ + connectionId: str + + def connectionMade(self, *args, **kwargs) -> None: + self.connectionId = f'WS{next(_wsClientSerial)}' + + self.sendMessage(json.dumps({'connectedAs': self.connectionId})) + log.info("new ws client %r", self) + self.settings.db.addClient(self) + + def connectionLost(self, reason): + log.info("bye ws client %r: %s", self, reason) + self.settings.db.clientErrored(Failure(WebsocketDisconnect(reason)), + self) + + def messageReceived(self, message: bytes): + if message == b'PING': + self.sendMessage('PONG') + return + log.debug("got message from %r: %s", self, message[:32]) + p = Patch(jsonRepr=message.decode('utf8')) + self.settings.db.patch(p, sender=self.connectionId) + + def sendPatch(self, p: Patch): + self.sendMessage(p.makeJsonRepr()) + + def __repr__(self): + return f"<SyncedGraph client {self.connectionId}>" + class Db(object): """ the master graph, all the connected clients, all the files we're watching """ def __init__(self, dirUriMap: DirUriMap, addlPrefixes): - self.clients: List[Union[Client, WsClient]] = [] + self.clients: List[WebsocketClient] = [] self.graph = ConjunctiveGraph() stats.graphLen = len(self.graph) stats.clients = len(self.clients) @@ -237,50 +238,44 @@ self.summarizeToLog() @graphStats.patchFps.rate() - def patch(self, patch: Patch, dueToFileChange: bool = False) -> None: + def patch(self, patch: Patch, sender: Optional[str]=None, dueToFileChange: bool = False) -> None: """ apply this patch to the master graph then notify everyone about it dueToFileChange if this is a patch describing an edit we read *from* the file (such that we shouldn't write it back to the file) - - if p has a senderUpdateUri attribute, we won't send this patch - back to the sender with that updateUri """ ctx = patch.getContext() log.info("patching graph %s -%d +%d" % (ctx, len(patch.delQuads), len(patch.addQuads))) - if hasattr(self, 'watchedFiles'): # not available during startup - self.watchedFiles.aboutToPatch(ctx) + self.watchedFiles.aboutToPatch(ctx) + # an error here needs to drop the sender, and reset everyone + # else if we can't rollback the failing patch. patchQuads(self.graph, patch.delQuads, patch.addQuads, perfect=True) stats.graphLen = len(self.graph) - self._sendPatch(patch) + + self._syncPatchToOtherClients(patch, sender) if not dueToFileChange: self.watchedFiles.dirtyFiles([ctx]) - sendToLiveClients(asJson=patch.jsonRepr) graphStats.statements = len(self.graph) - def _sendPatch(self, p: Patch): - senderUpdateUri: Optional[URIRef] = getattr(p, 'senderUpdateUri', None) - + def _syncPatchToOtherClients(self, p: Patch, sender: str): for c in self.clients: - if c.updateUri == senderUpdateUri: + if c.connectionId == sender: # this client has self-applied the patch already - log.debug("_sendPatch: don't resend to %r", c) + log.debug("_syncPatchToOtherClients: don't resend to %r", c) continue - log.debug('_sendPatch: send to %r', c) - d = c.sendPatch(p) - d.addErrback(self.clientErrored, c) - + log.debug('_syncPatchToOtherClients: send to %r', c) + c.sendPatch(p) + def clientErrored(self, err, c) -> None: err.trap(twisted.internet.error.ConnectError, WebsocketDisconnect) log.info("%r %r - dropping client", c, err.getErrorMessage()) if c in self.clients: self.clients.remove(c) stats.clients = len(self.clients) - self.sendClientsToAllLivePages() def summarizeToLog(self): log.info("contexts in graph (%s total stmts):" % len(self.graph)) @@ -304,25 +299,13 @@ g.add(s) return g - def addClient(self, newClient: Union[Client, WsClient]) -> None: - for c in self.clients: - if c.updateUri == newClient.updateUri: - self.clients.remove(c) + def addClient(self, newClient: WebsocketClient) -> None: + log.info("new connection: sending all graphs to %r..." % newClient) + newClient.sendPatch(Patch(addQuads=self.graph.quads(ALLSTMTS), delQuads=[])) - log.info("new client %r" % newClient) - sendGraphToClient(self.graph, newClient) self.clients.append(newClient) - self.sendClientsToAllLivePages() stats.clients = len(self.clients) - def sendClientsToAllLivePages(self) -> None: - sendToLiveClients({ - "clients": [ - dict(updateUri=c.updateUri.toPython(), label=repr(c)) - for c in self.clients - ] - }) - class GraphResource(cyclone.web.RequestHandler): @@ -344,32 +327,6 @@ self.write(self.settings.db.graph.serialize(format=format)) -class Patches(cyclone.web.RequestHandler): - - def __init__(self, *args, **kw): - cyclone.web.RequestHandler.__init__(self, *args, **kw) - p = makePatchEndpointPutMethod(self.settings.db.patch) - self.put = lambda: p(self) - - def get(self): - pass - - -class GraphClients(cyclone.web.RequestHandler): - - def get(self): - pass - - def post(self) -> None: - upd = URIRef(self.get_argument("clientUpdate")) - try: - self.settings.db.addClient(Client(upd, self.get_argument("label"))) - except: - import traceback - traceback.print_exc() - raise - - class Prefixes(cyclone.web.RequestHandler): def post(self): @@ -379,66 +336,6 @@ {}).update(suggestion['prefixes']) -_wsClientSerial = itertools.count(0) - - -class WebsocketClient(cyclone.websocket.WebSocketHandler): - - wsClient: Optional[WsClient] = None - - def connectionMade(self, *args, **kwargs) -> None: - connectionId = f'WS{next(_wsClientSerial)}' - - self.wsClient = WsClient(connectionId, self.sendMessage) - self.sendMessage(json.dumps({'connectedAs': connectionId})) - log.info("new ws client %r", self.wsClient) - self.settings.db.addClient(self.wsClient) - - def connectionLost(self, reason): - log.info("bye ws client %r", self.wsClient) - self.settings.db.clientErrored(Failure(WebsocketDisconnect(reason)), - self.wsClient) - - def messageReceived(self, message: bytes): - if message == b'PING': - self.sendMessage('PONG') - return - log.info("got message from %r: %s", self.wsClient, message) - p = Patch(jsonRepr=message.decode('utf8')) - assert self.wsClient is not None - p.senderUpdateUri = self.wsClient.updateUri - self.settings.db.patch(p) - - -class Live(cyclone.websocket.WebSocketHandler): - - def connectionMade(self, *args, **kwargs): - log.info("websocket opened") - liveClients.add(self) - stats.liveClients = len(liveClients) - self.settings.db.sendClientsToAllLivePages() - - def connectionLost(self, reason): - log.info("websocket closed") - liveClients.remove(self) - stats.liveClients = len(liveClients) - - def messageReceived(self, message: bytes): - log.info("got message %s" % message) - # this is just a leftover test? - self.sendMessage(message.decode('utf8')) - - -liveClients: Set[Live] = set() -stats.liveClients = len(liveClients) - -def sendToLiveClients(d: Optional[Dict]=None, asJson: Optional[str]=None): - msg: str = asJson or json.dumps(d) - assert isinstance(msg, str), repr(msg) - for c in liveClients: - c.sendMessage(msg) - - class NoExts(cyclone.web.StaticFileHandler): # .html pages can be get() without .html on them def get(self, path, *args, **kw): @@ -480,10 +377,7 @@ reactor.listenTCP( port, cyclone.web.Application(handlers=[ - (r'/live', Live), (r'/graph', GraphResource), - (r'/patches', Patches), - (r'/graphClients', GraphClients), (r'/syncedGraph', WebsocketClient), (r'/prefixes', Prefixes), (r'/stats/(.*)', StatsHandler, {'serverName': 'rdfdb'}),
--- a/rdfdb/syncedgraph.py Fri May 31 21:39:02 2019 +0000 +++ b/rdfdb/syncedgraph.py Fri May 31 21:40:00 2019 +0000 @@ -11,27 +11,23 @@ GraphEditApi - methods to write patches to the graph for common operations, e.g. replacing a value, or editing a mapping -PatchReceiver - our web server that listens to edits from the master graph - -PatchSender - collects and transmits your graph edits +WsClientProtocol one connection with the rdfdb server. """ import json, logging, traceback from typing import Optional +import urllib.parse -from rdflib import ConjunctiveGraph, URIRef -from twisted.internet import defer -import socket -import treq -import autobahn.twisted.websocket -from twisted.internet import reactor from rdfdb.autodepgraphapi import AutoDepGraphApi from rdfdb.currentstategraphapi import CurrentStateGraphApi from rdfdb.grapheditapi import GraphEditApi from rdfdb.patch import Patch -from rdfdb.patchreceiver import PatchReceiver -from rdfdb.patchsender import PatchSender from rdfdb.rdflibpatch import patchQuads +from rdflib import ConjunctiveGraph, URIRef +from twisted.internet import defer +from twisted.internet import reactor +import autobahn.twisted.websocket +import treq # everybody who writes literals needs to get this from rdfdb.rdflibpatch_literal import patch @@ -41,15 +37,46 @@ -class WsClient(autobahn.twisted.websocket.WebSocketClientProtocol): - def __init__(self, sg=0): +class WsClientProtocol(autobahn.twisted.websocket.WebSocketClientProtocol): + def __init__(self, sg): super().__init__() self.sg = sg + self.sg.currentClient = self + self.connectionId = None + + def onConnect(self, response): + log.info('conn %r', response) + def onOpen(self): - print('ws open') + log.info('ws open') + def onMessage(self, payload, isBinary): - print('on msg') + msg = json.loads(payload) + if 'connectedAs' in msg: + self.connectionId = msg['connectedAs'] + log.info(f'rdfdb calls us {self.connectionId}') + elif 'patch' in msg: + p = Patch(jsonRepr=payload.decode('utf8')) + log.debug("received patch %s", p.shortSummary()) + self.sg.onPatchFromDb(p) + else: + log.warn('unknown msg from websocket: %s...', payload[:32]) + def sendPatch(self, p: Patch): + # this is where we could concatenate little patches into a + # bigger one. Often, many statements will cancel each + # other out. + + # also who's going to accumulate patches when server is down, + # or is that not allowed? + if self.connectionId is None: + raise ValueError("can't send patches before we get an id") + body = p.makeJsonRepr() + log.debug(f'connectionId={self.connectionId} sending patch {len(body)} bytes') + self.sendMessage(body.encode('utf8')) + + def onClose(self, wasClean, code, reason): + log.info("WebSocket connection closed: {0}".format(reason)) class SyncedGraph(CurrentStateGraphApi, AutoDepGraphApi, GraphEditApi): """ @@ -86,15 +113,7 @@ receiverHost is the hostname other nodes can use to talk to me """ - # get that reonnecting agent - factory = autobahn.twisted.websocket.WebSocketClientFactory() - factory.protocol = WsClient - - reactor.connectTCP("127.0.0.1", 8209, factory) # need the path of /patches - - #if receiverHost is None: - # receiverHost = socket.gethostname() - + self.connectSocket(rdfdbRoot) self.rdfdbRoot = rdfdbRoot self.initiallySynced: defer.Deferred[None] = defer.Deferred() self._graph = ConjunctiveGraph() @@ -108,6 +127,18 @@ # this needs more state to track if we're doing a resync (and # everything has to error or wait) or if we're live + def connectSocket(self, rdfdbRoot: URIRef): + factory = autobahn.twisted.websocket.WebSocketClientFactory( + rdfdbRoot.replace('http://', 'ws://') + 'syncedGraph', + # Don't know if this is required by spec, but + # cyclone.websocket breaks with no origin header. + origin='foo') + factory.protocol = lambda: WsClientProtocol(self) + + rr = urllib.parse.urlparse(rdfdbRoot) + reactor.connectTCP(rr.hostname.encode('ascii'), rr.port, factory) + #WsClientProtocol sets our currentClient. Needs rewrite using agents. + def resync(self): """ get the whole graph again from the server (e.g. we had a @@ -123,17 +154,12 @@ UIs who want to show that we're doing a resync. """ log.info('resync') - self._sender.cancelAll() - # this should be locked so only one resync goes on at once - return treq.get(self.rdfdbRoot.toPython() + "graph", - headers={ - b'Accept': [b'x-trig'] - }, - ).addCallback(self._resyncGraph) + self.currentClient.dropConnection() def _resyncGraph(self, response): log.warn("new graph in") + self.currentClient.dropConnection() #diff against old entire graph #broadcast that change @@ -151,18 +177,15 @@ debugKey = '[id=%s]' % (id(p) % 1000) log.debug("\napply local patch %s %s", debugKey, p) try: - patchQuads(self._graph, - deleteQuads=p.delQuads, - addQuads=p.addQuads, - perfect=True) + self._applyPatchLocally(p) except ValueError as e: log.error(e) - self.sendFailed(None) + self.resync() return log.debug('runDepsOnNewPatch') self.runDepsOnNewPatch(p) log.debug('sendPatch') - self._sender.sendPatch(p).addErrback(self.sendFailed) + self.currentClient.sendPatch(p) log.debug('patch is done %s', debugKey) def suggestPrefixes(self, ctx, prefixes): @@ -177,25 +200,17 @@ 'prefixes': prefixes }).encode('utf8')) - def sendFailed(self, result): - """ - we asked for a patch to be queued and sent to the master, and - that ultimately failed because of a conflict - """ - log.warn("sendFailed") - self.resync() + def _applyPatchLocally(self, p: Patch): + # .. and disconnect on failure + patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True) + log.debug("graph now has %s statements" % len(self._graph)) - #i think we should receive back all the pending patches, - #do a resync here, - #then requeue all the pending patches (minus the failing one?) after that's done. - - def _onPatch(self, p): + def onPatchFromDb(self, p): """ central server has sent us a patch """ - log.debug('_onPatch server has sent us %s', p) - patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True) - log.debug("graph now has %s statements" % len(self._graph)) + log.debug('server has sent us %s', p) + self._applyPatchLocally(p) try: self.runDepsOnNewPatch(p) except Exception: