changeset 65:9bc9de580033

big rewrite of rdfdb and syncedgraph to use a single websocket connecton Ignore-this: 659d5fab58791d5d570555d095e4ad39 (initiated by syncedgraph) and sending patches both ways on that instead of having two open ports and a lot more interactions to worry about.
author Drew Perttula <drewp@bigasterisk.com>
date Fri, 31 May 2019 21:40:00 +0000
parents c1a9403e5d21
children 109fefea80a7
files rdfdb/patchreceiver.py rdfdb/patchsender.py rdfdb/service.py rdfdb/syncedgraph.py
diffstat 4 files changed, 131 insertions(+), 444 deletions(-) [+]
line wrap: on
line diff
--- a/rdfdb/patchreceiver.py	Fri May 31 21:39:02 2019 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,81 +0,0 @@
-import logging, traceback, urllib.request, urllib.parse, urllib.error
-
-from twisted.internet import reactor
-import cyclone.web
-import treq
-
-from rdfdb.patch import Patch
-
-log = logging.getLogger('syncedgraph')
-
-
-class PatchReceiver(object):
-    """
-    runs a web server in this process and registers it with the rdfdb
-    master. See onPatch for what happens when the rdfdb master sends
-    us a patch
-    """
-
-    def __init__(self, rdfdbRoot, host, label, onPatch):
-        """
-        label is what we'll call ourselves to the rdfdb server
-
-        onPatch is what we call back when the server sends a patch
-        """
-        self.rdfdbRoot = rdfdbRoot
-        listen = reactor.listenTCP(
-            0,
-            cyclone.web.Application(handlers=[
-                (r'/update', makePatchEndpoint(onPatch)),
-            ]))
-        port = listen._realPortNumber  # what's the right call for this?
-
-        self.updateResource = 'http://%s:%s/update' % (host, port)
-        log.info("listening on %s" % port)
-        self._register(label)
-
-    def _register(self, label):
-        url = (self.rdfdbRoot + 'graphClients').encode('utf8')
-        body = urllib.parse.urlencode([(b'clientUpdate', self.updateResource),
-                                       (b'label', label)]).encode('utf8')
-        treq.post(url, data=body, headers={
-            b'Content-Type': [b'application/x-www-form-urlencoded']
-        }).addCallbacks(
-            self._done,
-            lambda err: self._registerError(err, url, body)
-        )
-        log.info("registering with rdfdb at %s", url)
-
-    def _registerError(self, err, url, body):
-        log.error('registering to url=%r body=%r', url, body)
-        log.error(err)
-
-    def _done(self, x):
-        log.debug("registered with rdfdb")
-
-
-def makePatchEndpointPutMethod(cb):
-
-    def put(self) -> None:
-        assert isinstance(self, cyclone.web.RequestHandler)
-        try:
-            p = Patch(jsonRepr=self.request.body.decode('utf8'))
-            log.debug("received patch -%d +%d" %
-                      (len(p.delGraph), len(p.addGraph)))
-            cb(p)
-        except Exception as e:
-            self.set_status(500)
-            # a string that will look good in rdfdb's log
-            self.write(repr(e))
-            traceback.print_exc()
-            raise
-
-    return put
-
-
-def makePatchEndpoint(cb):
-
-    class Update(cyclone.web.RequestHandler):
-        put = makePatchEndpointPutMethod(cb)
-
-    return Update
--- a/rdfdb/patchsender.py	Fri May 31 21:39:02 2019 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,141 +0,0 @@
-import logging, time
-from typing import List, Tuple, Optional
-
-from rdflib import URIRef
-from twisted.internet import defer
-import treq
-
-from rdfdb.patch import Patch
-
-log = logging.getLogger('patchsender')
-
-SendResult = defer.Deferred  # to None
-
-
-class PatchSender(object):
-    """
-    SyncedGraph may generate patches faster than we can send
-    them. This object buffers and may even collapse patches before
-    they go the server
-    """
-
-    def __init__(self, target: URIRef, myUpdateResource):
-        """
-        target is the URI we'll send patches to
-
-        myUpdateResource is the URI for this sender of patches, which
-        maybe needs to be the actual requestable update URI for
-        sending updates back to us
-        """
-        self.target = target
-        self.myUpdateResource = myUpdateResource
-        self._patchesToSend: List[Tuple[Patch, SendResult]] = []
-        self._currentSendPatchRequest: Optional[SendResult] = None
-
-    def sendPatch(self, p: Patch) -> SendResult:
-        sendResult: SendResult = defer.Deferred()
-        self._patchesToSend.append((p, sendResult))
-        self._continueSending()
-        return sendResult
-
-    def cancelAll(self):
-        self._patchesToSend[:] = []
-        # we might be in the middle of a post; ideally that would be
-        # aborted, too. Since that's not coded yet, or it might be too late to
-        # abort, what should happen?
-        # 1. this could return deferred until we think our posts have stopped
-        # 2. or, other code could deal for the fact that cancelAll
-        # isn't perfect
-
-    def _continueSending(self) -> None:
-        if not self._patchesToSend or self._currentSendPatchRequest:
-            return
-        if len(self._patchesToSend) > 1:
-            log.debug("%s patches left to send", len(self._patchesToSend))
-            # this is where we could concatenate little patches into a
-            # bigger one. Often, many statements will cancel each
-            # other out. not working yet:
-            if 0:
-                p = self._patchesToSend[0].concat(self._patchesToSend[1:])
-                print("concat down to")
-                print('dels')
-                for q in p.delQuads:
-                    print(q)
-                print('adds')
-                for q in p.addQuads:
-                    print(q)
-                print("----")
-            else:
-                p, sendResult = self._patchesToSend.pop(0)
-        else:
-            p, sendResult = self._patchesToSend.pop(0)
-
-        self._currentSendPatchRequest = sendPatch(
-            self.target, p, senderUpdateUri=self.myUpdateResource)
-        self._currentSendPatchRequest.addCallbacks(self._sendPatchDone,
-                                                   self._sendPatchErr)
-        self._currentSendPatchRequest.chainDeferred(sendResult)
-
-    def _sendPatchDone(self, result):
-        self._currentSendPatchRequest = None
-        self._continueSending()
-
-    def _sendPatchErr(self, e):
-        self._currentSendPatchRequest = None
-        # we're probably out of sync with the master now, since
-        # SyncedGraph.patch optimistically applied the patch to our
-        # local graph already. What happens to this patch? What
-        # happens to further pending patches? Some of the further
-        # patches, especially, may be commutable with the bad one and
-        # might still make sense to apply to the master graph.
-
-        # if someday we are folding pending patches together, this
-        # would be the time to UNDO that and attempt the original
-        # separate patches again
-
-        # this should screen for 409 conflict responses and raise a
-        # special exception for that, so SyncedGraph.sendFailed can
-        # screen for only that type
-
-        # this code is going away; we're going to raise an exception that contains all the pending patches
-        log.error("_sendPatchErr")
-        log.error(e)
-        self._continueSending()
-
-
-def sendPatch(putUri: URIRef, patch: Patch, **kw) -> defer.Deferred:
-    """
-    PUT a patch as json to an http server. Returns deferred.
-    
-    kwargs will become extra attributes in the toplevel json object
-    """
-    t1 = time.time()
-    body = patch.makeJsonRepr(kw)
-    jsonTime = time.time() - t1
-    intro = body[:200]
-    if len(body) > 200:
-        intro = intro + "..."
-    log.debug("send body (rendered %.1fkB in %.1fms): %s",
-              len(body) / 1024, jsonTime * 1000, intro)
-    sendTime = time.time()
-
-    def putDone(done):
-        if not str(done.code).startswith('2'):
-            def fail(content):
-                log.warn(f"Sent patch to {putUri}:")
-                log.warn(str(patch))
-                log.warn(f"Receiver failed {done.code} {content}")
-                raise ValueError("sendPatch failed")
-            return done.content().addCallback(fail)
-
-        dt = 1000 * (time.time() - sendTime)
-        log.debug("sendPatch to %s took %.1fms" % (putUri, dt))
-        return done
-
-    return treq.put(putUri.toPython(),
-                    data=body.encode('utf8'),
-                    headers={
-                        b'Content-Type': [b'application/json']
-                    },
-                    timeout=2,
-    ).addCallback(putDone)
--- a/rdfdb/service.py	Fri May 31 21:39:02 2019 +0000
+++ b/rdfdb/service.py	Fri May 31 21:40:00 2019 +0000
@@ -1,9 +1,9 @@
 import sys, optparse, logging, json, os, time, itertools
-from typing import Callable, Dict, List, Set, Optional, Union
+from typing import Dict, List, Optional
 
 from greplin.scales.cyclonehandler import StatsHandler
 from greplin import scales
-from twisted.internet import reactor, defer, task
+from twisted.internet import reactor, task
 from twisted.internet.inotify import IN_CREATE, INotify
 from twisted.python.failure import Failure
 from twisted.python.filepath import FilePath
@@ -14,8 +14,6 @@
 from rdfdb.file_vs_uri import correctToTopdirPrefix, fileForUri, uriFromFile, DirUriMap
 from rdfdb.graphfile import GraphFile, PatchCb, GetSubgraph
 from rdfdb.patch import Patch, ALLSTMTS
-from rdfdb.patchreceiver import makePatchEndpointPutMethod
-from rdfdb.patchsender import sendPatch
 from rdfdb.rdflibpatch import patchQuads
 
 # move this out
@@ -46,49 +44,6 @@
     pass
 
 
-class Client(object):
-    """
-    one of our syncedgraph clients
-    """
-
-    def __init__(self, updateUri: URIRef, label: str):
-        self.label = label
-        # todo: updateUri is used publicly to compare clients. Replace
-        # it with Client.__eq__ so WsClient doesn't have to fake an
-        # updateUri.
-        self.updateUri = updateUri
-
-    def __repr__(self):
-        return "<%s client at %s>" % (self.label, self.updateUri)
-
-    def sendPatch(self, p: Patch) -> defer.Deferred:
-        """
-        returns deferred. error will be interpreted as the client being
-        broken.
-        """
-        return sendPatch(self.updateUri, p)
-
-
-class WsClient(object):
-
-    def __init__(self, connectionId: str, sendMessage: Callable[[str], None]):
-        self.updateUri = URIRef(connectionId)
-        self.sendMessage = sendMessage
-
-    def __repr__(self):
-        return "<WsClient %s>" % self.updateUri
-
-    def sendPatch(self, p: Patch) -> defer.Deferred:
-        self.sendMessage(p.makeJsonRepr())
-        return defer.succeed(None)
-
-
-def sendGraphToClient(graph, client: Union[Client, WsClient]) -> None:
-    """send the client the whole graph contents"""
-    log.info("sending all graphs to %r..." % client)
-    client.sendPatch(Patch(addQuads=graph.quads(ALLSTMTS), delQuads=[]))
-    log.info("...sent.")
-
 
 class WatchedFiles(object):
     """
@@ -220,13 +175,59 @@
             self.graphFiles[ctx].dirty(g)
 
 
+_wsClientSerial = itertools.count(0)
+
+
+class WebsocketClient(cyclone.websocket.WebSocketHandler):
+    """
+    Send patches to the client (starting with a client who has 0
+    statements) to keep it in sync with the graph.
+
+    Accept patches from the client, and assume that the client has
+    already applied them to its local graph.
+
+    Treat a disconnect as 'out of sync'. Either the client thinks it
+    is out of sync and wants to start over, or we can't apply a patch
+    correctly therefore we disconnect to make the client start over.
+
+    This socket may also carry some special messages meant for the
+    rdfdb web UI, e.g. about who is connected, etc.
+    """
+    connectionId: str
+
+    def connectionMade(self, *args, **kwargs) -> None:
+        self.connectionId = f'WS{next(_wsClientSerial)}'
+
+        self.sendMessage(json.dumps({'connectedAs': self.connectionId}))
+        log.info("new ws client %r", self)
+        self.settings.db.addClient(self)
+
+    def connectionLost(self, reason):
+        log.info("bye ws client %r: %s", self, reason)
+        self.settings.db.clientErrored(Failure(WebsocketDisconnect(reason)),
+                                       self)
+
+    def messageReceived(self, message: bytes):
+        if message == b'PING':
+            self.sendMessage('PONG')
+            return
+        log.debug("got message from %r: %s", self, message[:32])
+        p = Patch(jsonRepr=message.decode('utf8'))
+        self.settings.db.patch(p, sender=self.connectionId)
+
+    def sendPatch(self, p: Patch):
+        self.sendMessage(p.makeJsonRepr())
+
+    def __repr__(self):
+        return f"<SyncedGraph client {self.connectionId}>"
+
 class Db(object):
     """
     the master graph, all the connected clients, all the files we're watching
     """
 
     def __init__(self, dirUriMap: DirUriMap, addlPrefixes):
-        self.clients: List[Union[Client, WsClient]] = []
+        self.clients: List[WebsocketClient] = []
         self.graph = ConjunctiveGraph()
         stats.graphLen = len(self.graph)
         stats.clients = len(self.clients)
@@ -237,50 +238,44 @@
         self.summarizeToLog()
 
     @graphStats.patchFps.rate()
-    def patch(self, patch: Patch, dueToFileChange: bool = False) -> None:
+    def patch(self, patch: Patch, sender: Optional[str]=None, dueToFileChange: bool = False) -> None:
         """
         apply this patch to the master graph then notify everyone about it
 
         dueToFileChange if this is a patch describing an edit we read
         *from* the file (such that we shouldn't write it back to the file)
-
-        if p has a senderUpdateUri attribute, we won't send this patch
-        back to the sender with that updateUri
         """
         ctx = patch.getContext()
         log.info("patching graph %s -%d +%d" %
                  (ctx, len(patch.delQuads), len(patch.addQuads)))
 
-        if hasattr(self, 'watchedFiles'):  # not available during startup
-            self.watchedFiles.aboutToPatch(ctx)
+        self.watchedFiles.aboutToPatch(ctx)
 
+        # an error here needs to drop the sender, and reset everyone
+        # else if we can't rollback the failing patch.
         patchQuads(self.graph, patch.delQuads, patch.addQuads, perfect=True)
         stats.graphLen = len(self.graph)
-        self._sendPatch(patch)
+        
+        self._syncPatchToOtherClients(patch, sender)
         if not dueToFileChange:
             self.watchedFiles.dirtyFiles([ctx])
-        sendToLiveClients(asJson=patch.jsonRepr)
         graphStats.statements = len(self.graph)
 
-    def _sendPatch(self, p: Patch):
-        senderUpdateUri: Optional[URIRef] = getattr(p, 'senderUpdateUri', None)
-
+    def _syncPatchToOtherClients(self, p: Patch, sender: str):
         for c in self.clients:
-            if c.updateUri == senderUpdateUri:
+            if c.connectionId == sender:
                 # this client has self-applied the patch already
-                log.debug("_sendPatch: don't resend to %r", c)
+                log.debug("_syncPatchToOtherClients: don't resend to %r", c)
                 continue
-            log.debug('_sendPatch: send to %r', c)
-            d = c.sendPatch(p)
-            d.addErrback(self.clientErrored, c)
-
+            log.debug('_syncPatchToOtherClients: send to %r', c)
+            c.sendPatch(p)
+            
     def clientErrored(self, err, c) -> None:
         err.trap(twisted.internet.error.ConnectError, WebsocketDisconnect)
         log.info("%r %r - dropping client", c, err.getErrorMessage())
         if c in self.clients:
             self.clients.remove(c)
         stats.clients = len(self.clients)
-        self.sendClientsToAllLivePages()
 
     def summarizeToLog(self):
         log.info("contexts in graph (%s total stmts):" % len(self.graph))
@@ -304,25 +299,13 @@
             g.add(s)
         return g
 
-    def addClient(self, newClient: Union[Client, WsClient]) -> None:
-        for c in self.clients:
-            if c.updateUri == newClient.updateUri:
-                self.clients.remove(c)
+    def addClient(self, newClient: WebsocketClient) -> None:
+        log.info("new connection: sending all graphs to %r..." % newClient)
+        newClient.sendPatch(Patch(addQuads=self.graph.quads(ALLSTMTS), delQuads=[]))
 
-        log.info("new client %r" % newClient)
-        sendGraphToClient(self.graph, newClient)
         self.clients.append(newClient)
-        self.sendClientsToAllLivePages()
         stats.clients = len(self.clients)
 
-    def sendClientsToAllLivePages(self) -> None:
-        sendToLiveClients({
-            "clients": [
-                dict(updateUri=c.updateUri.toPython(), label=repr(c))
-                for c in self.clients
-            ]
-        })
-
 
 class GraphResource(cyclone.web.RequestHandler):
 
@@ -344,32 +327,6 @@
         self.write(self.settings.db.graph.serialize(format=format))
 
 
-class Patches(cyclone.web.RequestHandler):
-
-    def __init__(self, *args, **kw):
-        cyclone.web.RequestHandler.__init__(self, *args, **kw)
-        p = makePatchEndpointPutMethod(self.settings.db.patch)
-        self.put = lambda: p(self)
-
-    def get(self):
-        pass
-
-
-class GraphClients(cyclone.web.RequestHandler):
-
-    def get(self):
-        pass
-
-    def post(self) -> None:
-        upd = URIRef(self.get_argument("clientUpdate"))
-        try:
-            self.settings.db.addClient(Client(upd, self.get_argument("label")))
-        except:
-            import traceback
-            traceback.print_exc()
-            raise
-
-
 class Prefixes(cyclone.web.RequestHandler):
 
     def post(self):
@@ -379,66 +336,6 @@
                                 {}).update(suggestion['prefixes'])
 
 
-_wsClientSerial = itertools.count(0)
-
-
-class WebsocketClient(cyclone.websocket.WebSocketHandler):
-
-    wsClient: Optional[WsClient] = None
-
-    def connectionMade(self, *args, **kwargs) -> None:
-        connectionId = f'WS{next(_wsClientSerial)}'
-
-        self.wsClient = WsClient(connectionId, self.sendMessage)
-        self.sendMessage(json.dumps({'connectedAs': connectionId}))
-        log.info("new ws client %r", self.wsClient)
-        self.settings.db.addClient(self.wsClient)
-
-    def connectionLost(self, reason):
-        log.info("bye ws client %r", self.wsClient)
-        self.settings.db.clientErrored(Failure(WebsocketDisconnect(reason)),
-                                       self.wsClient)
-
-    def messageReceived(self, message: bytes):
-        if message == b'PING':
-            self.sendMessage('PONG')
-            return
-        log.info("got message from %r: %s", self.wsClient, message)
-        p = Patch(jsonRepr=message.decode('utf8'))
-        assert self.wsClient is not None
-        p.senderUpdateUri = self.wsClient.updateUri
-        self.settings.db.patch(p)
-
-
-class Live(cyclone.websocket.WebSocketHandler):
-
-    def connectionMade(self, *args, **kwargs):
-        log.info("websocket opened")
-        liveClients.add(self)
-        stats.liveClients = len(liveClients)
-        self.settings.db.sendClientsToAllLivePages()
-
-    def connectionLost(self, reason):
-        log.info("websocket closed")
-        liveClients.remove(self)
-        stats.liveClients = len(liveClients)
-
-    def messageReceived(self, message: bytes):
-        log.info("got message %s" % message)
-        # this is just a leftover test?
-        self.sendMessage(message.decode('utf8'))
-
-
-liveClients: Set[Live] = set()
-stats.liveClients = len(liveClients)
-
-def sendToLiveClients(d: Optional[Dict]=None, asJson: Optional[str]=None):
-    msg: str = asJson or json.dumps(d)
-    assert isinstance(msg, str), repr(msg)
-    for c in liveClients:
-        c.sendMessage(msg)
-
-
 class NoExts(cyclone.web.StaticFileHandler):
     # .html pages can be get() without .html on them
     def get(self, path, *args, **kw):
@@ -480,10 +377,7 @@
     reactor.listenTCP(
         port,
         cyclone.web.Application(handlers=[
-            (r'/live', Live),
             (r'/graph', GraphResource),
-            (r'/patches', Patches),
-            (r'/graphClients', GraphClients),
             (r'/syncedGraph', WebsocketClient),
             (r'/prefixes', Prefixes),
             (r'/stats/(.*)', StatsHandler, {'serverName': 'rdfdb'}),
--- a/rdfdb/syncedgraph.py	Fri May 31 21:39:02 2019 +0000
+++ b/rdfdb/syncedgraph.py	Fri May 31 21:40:00 2019 +0000
@@ -11,27 +11,23 @@
 GraphEditApi - methods to write patches to the graph for common
 operations, e.g. replacing a value, or editing a mapping
 
-PatchReceiver - our web server that listens to edits from the master graph
-
-PatchSender - collects and transmits your graph edits
+WsClientProtocol one connection with the rdfdb server.
 """
 
 import json, logging, traceback
 from typing import Optional
+import urllib.parse
 
-from rdflib import ConjunctiveGraph, URIRef
-from twisted.internet import defer
-import socket
-import treq
-import autobahn.twisted.websocket
-from twisted.internet import reactor
 from rdfdb.autodepgraphapi import AutoDepGraphApi
 from rdfdb.currentstategraphapi import CurrentStateGraphApi
 from rdfdb.grapheditapi import GraphEditApi
 from rdfdb.patch import Patch
-from rdfdb.patchreceiver import PatchReceiver
-from rdfdb.patchsender import PatchSender
 from rdfdb.rdflibpatch import patchQuads
+from rdflib import ConjunctiveGraph, URIRef
+from twisted.internet import defer
+from twisted.internet import reactor
+import autobahn.twisted.websocket
+import treq
 
 # everybody who writes literals needs to get this
 from rdfdb.rdflibpatch_literal import patch
@@ -41,15 +37,46 @@
 
 
 
-class WsClient(autobahn.twisted.websocket.WebSocketClientProtocol):
-    def __init__(self, sg=0):
+class WsClientProtocol(autobahn.twisted.websocket.WebSocketClientProtocol):
+    def __init__(self, sg):
         super().__init__()
         self.sg = sg
+        self.sg.currentClient = self
+        self.connectionId = None
+
+    def onConnect(self, response):
+        log.info('conn %r', response)
+
     def onOpen(self):
-        print('ws open')
+        log.info('ws open')
+        
     def onMessage(self, payload, isBinary):
-        print('on msg')
+        msg = json.loads(payload)
+        if 'connectedAs' in msg:
+            self.connectionId = msg['connectedAs']
+            log.info(f'rdfdb calls us {self.connectionId}')
+        elif 'patch' in msg:
+            p = Patch(jsonRepr=payload.decode('utf8'))
+            log.debug("received patch %s", p.shortSummary())
+            self.sg.onPatchFromDb(p)
+        else:
+            log.warn('unknown msg from websocket: %s...', payload[:32])
 
+    def sendPatch(self, p: Patch):
+        # this is where we could concatenate little patches into a
+        # bigger one. Often, many statements will cancel each
+        # other out.
+
+        # also who's going to accumulate patches when server is down,
+        # or is that not allowed?
+        if self.connectionId is None:
+            raise ValueError("can't send patches before we get an id")
+        body = p.makeJsonRepr()
+        log.debug(f'connectionId={self.connectionId} sending patch {len(body)} bytes')
+        self.sendMessage(body.encode('utf8'))
+
+    def onClose(self, wasClean, code, reason):
+        log.info("WebSocket connection closed: {0}".format(reason))
 
 class SyncedGraph(CurrentStateGraphApi, AutoDepGraphApi, GraphEditApi):
     """
@@ -86,15 +113,7 @@
         receiverHost is the hostname other nodes can use to talk to me
         """
 
-        # get that reonnecting agent
-        factory = autobahn.twisted.websocket.WebSocketClientFactory()
-        factory.protocol = WsClient
-
-        reactor.connectTCP("127.0.0.1", 8209, factory) #  need the path of /patches
-        
-        #if receiverHost is None:
-        #    receiverHost = socket.gethostname()
-
+        self.connectSocket(rdfdbRoot)
         self.rdfdbRoot = rdfdbRoot
         self.initiallySynced: defer.Deferred[None] = defer.Deferred()
         self._graph = ConjunctiveGraph()
@@ -108,6 +127,18 @@
         # this needs more state to track if we're doing a resync (and
         # everything has to error or wait) or if we're live
 
+    def connectSocket(self, rdfdbRoot: URIRef):
+        factory = autobahn.twisted.websocket.WebSocketClientFactory(
+            rdfdbRoot.replace('http://', 'ws://') + 'syncedGraph',
+            # Don't know if this is required by spec, but
+            # cyclone.websocket breaks with no origin header.
+            origin='foo')
+        factory.protocol = lambda: WsClientProtocol(self)
+
+        rr = urllib.parse.urlparse(rdfdbRoot)
+        reactor.connectTCP(rr.hostname.encode('ascii'), rr.port, factory)
+        #WsClientProtocol sets our currentClient. Needs rewrite using agents.
+
     def resync(self):
         """
         get the whole graph again from the server (e.g. we had a
@@ -123,17 +154,12 @@
         UIs who want to show that we're doing a resync.
         """
         log.info('resync')
-        self._sender.cancelAll()
-        # this should be locked so only one resync goes on at once
-        return treq.get(self.rdfdbRoot.toPython() + "graph",
-            headers={
-                b'Accept': [b'x-trig']
-            },
-        ).addCallback(self._resyncGraph)
+        self.currentClient.dropConnection()
 
     def _resyncGraph(self, response):
         log.warn("new graph in")
 
+        self.currentClient.dropConnection()
         #diff against old entire graph
         #broadcast that change
 
@@ -151,18 +177,15 @@
         debugKey = '[id=%s]' % (id(p) % 1000)
         log.debug("\napply local patch %s %s", debugKey, p)
         try:
-            patchQuads(self._graph,
-                       deleteQuads=p.delQuads,
-                       addQuads=p.addQuads,
-                       perfect=True)
+            self._applyPatchLocally(p)
         except ValueError as e:
             log.error(e)
-            self.sendFailed(None)
+            self.resync()
             return
         log.debug('runDepsOnNewPatch')
         self.runDepsOnNewPatch(p)
         log.debug('sendPatch')
-        self._sender.sendPatch(p).addErrback(self.sendFailed)
+        self.currentClient.sendPatch(p)
         log.debug('patch is done %s', debugKey)
 
     def suggestPrefixes(self, ctx, prefixes):
@@ -177,25 +200,17 @@
                       'prefixes': prefixes
                   }).encode('utf8'))
 
-    def sendFailed(self, result):
-        """
-        we asked for a patch to be queued and sent to the master, and
-        that ultimately failed because of a conflict
-        """
-        log.warn("sendFailed")
-        self.resync()
+    def _applyPatchLocally(self, p: Patch):
+        # .. and disconnect on failure
+        patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
+        log.debug("graph now has %s statements" % len(self._graph))
 
-        #i think we should receive back all the pending patches,
-        #do a resync here,
-        #then requeue all the pending patches (minus the failing one?) after that's done.
-
-    def _onPatch(self, p):
+    def onPatchFromDb(self, p):
         """
         central server has sent us a patch
         """
-        log.debug('_onPatch server has sent us %s', p)
-        patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
-        log.debug("graph now has %s statements" % len(self._graph))
+        log.debug('server has sent us %s', p)
+        self._applyPatchLocally(p)
         try:
             self.runDepsOnNewPatch(p)
         except Exception: