diff rdfdb/service.py @ 65:9bc9de580033

big rewrite of rdfdb and syncedgraph to use a single websocket connecton Ignore-this: 659d5fab58791d5d570555d095e4ad39 (initiated by syncedgraph) and sending patches both ways on that instead of having two open ports and a lot more interactions to worry about.
author Drew Perttula <drewp@bigasterisk.com>
date Fri, 31 May 2019 21:40:00 +0000
parents dbf47ed931d9
children 109fefea80a7
line wrap: on
line diff
--- a/rdfdb/service.py	Fri May 31 21:39:02 2019 +0000
+++ b/rdfdb/service.py	Fri May 31 21:40:00 2019 +0000
@@ -1,9 +1,9 @@
 import sys, optparse, logging, json, os, time, itertools
-from typing import Callable, Dict, List, Set, Optional, Union
+from typing import Dict, List, Optional
 
 from greplin.scales.cyclonehandler import StatsHandler
 from greplin import scales
-from twisted.internet import reactor, defer, task
+from twisted.internet import reactor, task
 from twisted.internet.inotify import IN_CREATE, INotify
 from twisted.python.failure import Failure
 from twisted.python.filepath import FilePath
@@ -14,8 +14,6 @@
 from rdfdb.file_vs_uri import correctToTopdirPrefix, fileForUri, uriFromFile, DirUriMap
 from rdfdb.graphfile import GraphFile, PatchCb, GetSubgraph
 from rdfdb.patch import Patch, ALLSTMTS
-from rdfdb.patchreceiver import makePatchEndpointPutMethod
-from rdfdb.patchsender import sendPatch
 from rdfdb.rdflibpatch import patchQuads
 
 # move this out
@@ -46,49 +44,6 @@
     pass
 
 
-class Client(object):
-    """
-    one of our syncedgraph clients
-    """
-
-    def __init__(self, updateUri: URIRef, label: str):
-        self.label = label
-        # todo: updateUri is used publicly to compare clients. Replace
-        # it with Client.__eq__ so WsClient doesn't have to fake an
-        # updateUri.
-        self.updateUri = updateUri
-
-    def __repr__(self):
-        return "<%s client at %s>" % (self.label, self.updateUri)
-
-    def sendPatch(self, p: Patch) -> defer.Deferred:
-        """
-        returns deferred. error will be interpreted as the client being
-        broken.
-        """
-        return sendPatch(self.updateUri, p)
-
-
-class WsClient(object):
-
-    def __init__(self, connectionId: str, sendMessage: Callable[[str], None]):
-        self.updateUri = URIRef(connectionId)
-        self.sendMessage = sendMessage
-
-    def __repr__(self):
-        return "<WsClient %s>" % self.updateUri
-
-    def sendPatch(self, p: Patch) -> defer.Deferred:
-        self.sendMessage(p.makeJsonRepr())
-        return defer.succeed(None)
-
-
-def sendGraphToClient(graph, client: Union[Client, WsClient]) -> None:
-    """send the client the whole graph contents"""
-    log.info("sending all graphs to %r..." % client)
-    client.sendPatch(Patch(addQuads=graph.quads(ALLSTMTS), delQuads=[]))
-    log.info("...sent.")
-
 
 class WatchedFiles(object):
     """
@@ -220,13 +175,59 @@
             self.graphFiles[ctx].dirty(g)
 
 
+_wsClientSerial = itertools.count(0)
+
+
+class WebsocketClient(cyclone.websocket.WebSocketHandler):
+    """
+    Send patches to the client (starting with a client who has 0
+    statements) to keep it in sync with the graph.
+
+    Accept patches from the client, and assume that the client has
+    already applied them to its local graph.
+
+    Treat a disconnect as 'out of sync'. Either the client thinks it
+    is out of sync and wants to start over, or we can't apply a patch
+    correctly therefore we disconnect to make the client start over.
+
+    This socket may also carry some special messages meant for the
+    rdfdb web UI, e.g. about who is connected, etc.
+    """
+    connectionId: str
+
+    def connectionMade(self, *args, **kwargs) -> None:
+        self.connectionId = f'WS{next(_wsClientSerial)}'
+
+        self.sendMessage(json.dumps({'connectedAs': self.connectionId}))
+        log.info("new ws client %r", self)
+        self.settings.db.addClient(self)
+
+    def connectionLost(self, reason):
+        log.info("bye ws client %r: %s", self, reason)
+        self.settings.db.clientErrored(Failure(WebsocketDisconnect(reason)),
+                                       self)
+
+    def messageReceived(self, message: bytes):
+        if message == b'PING':
+            self.sendMessage('PONG')
+            return
+        log.debug("got message from %r: %s", self, message[:32])
+        p = Patch(jsonRepr=message.decode('utf8'))
+        self.settings.db.patch(p, sender=self.connectionId)
+
+    def sendPatch(self, p: Patch):
+        self.sendMessage(p.makeJsonRepr())
+
+    def __repr__(self):
+        return f"<SyncedGraph client {self.connectionId}>"
+
 class Db(object):
     """
     the master graph, all the connected clients, all the files we're watching
     """
 
     def __init__(self, dirUriMap: DirUriMap, addlPrefixes):
-        self.clients: List[Union[Client, WsClient]] = []
+        self.clients: List[WebsocketClient] = []
         self.graph = ConjunctiveGraph()
         stats.graphLen = len(self.graph)
         stats.clients = len(self.clients)
@@ -237,50 +238,44 @@
         self.summarizeToLog()
 
     @graphStats.patchFps.rate()
-    def patch(self, patch: Patch, dueToFileChange: bool = False) -> None:
+    def patch(self, patch: Patch, sender: Optional[str]=None, dueToFileChange: bool = False) -> None:
         """
         apply this patch to the master graph then notify everyone about it
 
         dueToFileChange if this is a patch describing an edit we read
         *from* the file (such that we shouldn't write it back to the file)
-
-        if p has a senderUpdateUri attribute, we won't send this patch
-        back to the sender with that updateUri
         """
         ctx = patch.getContext()
         log.info("patching graph %s -%d +%d" %
                  (ctx, len(patch.delQuads), len(patch.addQuads)))
 
-        if hasattr(self, 'watchedFiles'):  # not available during startup
-            self.watchedFiles.aboutToPatch(ctx)
+        self.watchedFiles.aboutToPatch(ctx)
 
+        # an error here needs to drop the sender, and reset everyone
+        # else if we can't rollback the failing patch.
         patchQuads(self.graph, patch.delQuads, patch.addQuads, perfect=True)
         stats.graphLen = len(self.graph)
-        self._sendPatch(patch)
+        
+        self._syncPatchToOtherClients(patch, sender)
         if not dueToFileChange:
             self.watchedFiles.dirtyFiles([ctx])
-        sendToLiveClients(asJson=patch.jsonRepr)
         graphStats.statements = len(self.graph)
 
-    def _sendPatch(self, p: Patch):
-        senderUpdateUri: Optional[URIRef] = getattr(p, 'senderUpdateUri', None)
-
+    def _syncPatchToOtherClients(self, p: Patch, sender: str):
         for c in self.clients:
-            if c.updateUri == senderUpdateUri:
+            if c.connectionId == sender:
                 # this client has self-applied the patch already
-                log.debug("_sendPatch: don't resend to %r", c)
+                log.debug("_syncPatchToOtherClients: don't resend to %r", c)
                 continue
-            log.debug('_sendPatch: send to %r', c)
-            d = c.sendPatch(p)
-            d.addErrback(self.clientErrored, c)
-
+            log.debug('_syncPatchToOtherClients: send to %r', c)
+            c.sendPatch(p)
+            
     def clientErrored(self, err, c) -> None:
         err.trap(twisted.internet.error.ConnectError, WebsocketDisconnect)
         log.info("%r %r - dropping client", c, err.getErrorMessage())
         if c in self.clients:
             self.clients.remove(c)
         stats.clients = len(self.clients)
-        self.sendClientsToAllLivePages()
 
     def summarizeToLog(self):
         log.info("contexts in graph (%s total stmts):" % len(self.graph))
@@ -304,25 +299,13 @@
             g.add(s)
         return g
 
-    def addClient(self, newClient: Union[Client, WsClient]) -> None:
-        for c in self.clients:
-            if c.updateUri == newClient.updateUri:
-                self.clients.remove(c)
+    def addClient(self, newClient: WebsocketClient) -> None:
+        log.info("new connection: sending all graphs to %r..." % newClient)
+        newClient.sendPatch(Patch(addQuads=self.graph.quads(ALLSTMTS), delQuads=[]))
 
-        log.info("new client %r" % newClient)
-        sendGraphToClient(self.graph, newClient)
         self.clients.append(newClient)
-        self.sendClientsToAllLivePages()
         stats.clients = len(self.clients)
 
-    def sendClientsToAllLivePages(self) -> None:
-        sendToLiveClients({
-            "clients": [
-                dict(updateUri=c.updateUri.toPython(), label=repr(c))
-                for c in self.clients
-            ]
-        })
-
 
 class GraphResource(cyclone.web.RequestHandler):
 
@@ -344,32 +327,6 @@
         self.write(self.settings.db.graph.serialize(format=format))
 
 
-class Patches(cyclone.web.RequestHandler):
-
-    def __init__(self, *args, **kw):
-        cyclone.web.RequestHandler.__init__(self, *args, **kw)
-        p = makePatchEndpointPutMethod(self.settings.db.patch)
-        self.put = lambda: p(self)
-
-    def get(self):
-        pass
-
-
-class GraphClients(cyclone.web.RequestHandler):
-
-    def get(self):
-        pass
-
-    def post(self) -> None:
-        upd = URIRef(self.get_argument("clientUpdate"))
-        try:
-            self.settings.db.addClient(Client(upd, self.get_argument("label")))
-        except:
-            import traceback
-            traceback.print_exc()
-            raise
-
-
 class Prefixes(cyclone.web.RequestHandler):
 
     def post(self):
@@ -379,66 +336,6 @@
                                 {}).update(suggestion['prefixes'])
 
 
-_wsClientSerial = itertools.count(0)
-
-
-class WebsocketClient(cyclone.websocket.WebSocketHandler):
-
-    wsClient: Optional[WsClient] = None
-
-    def connectionMade(self, *args, **kwargs) -> None:
-        connectionId = f'WS{next(_wsClientSerial)}'
-
-        self.wsClient = WsClient(connectionId, self.sendMessage)
-        self.sendMessage(json.dumps({'connectedAs': connectionId}))
-        log.info("new ws client %r", self.wsClient)
-        self.settings.db.addClient(self.wsClient)
-
-    def connectionLost(self, reason):
-        log.info("bye ws client %r", self.wsClient)
-        self.settings.db.clientErrored(Failure(WebsocketDisconnect(reason)),
-                                       self.wsClient)
-
-    def messageReceived(self, message: bytes):
-        if message == b'PING':
-            self.sendMessage('PONG')
-            return
-        log.info("got message from %r: %s", self.wsClient, message)
-        p = Patch(jsonRepr=message.decode('utf8'))
-        assert self.wsClient is not None
-        p.senderUpdateUri = self.wsClient.updateUri
-        self.settings.db.patch(p)
-
-
-class Live(cyclone.websocket.WebSocketHandler):
-
-    def connectionMade(self, *args, **kwargs):
-        log.info("websocket opened")
-        liveClients.add(self)
-        stats.liveClients = len(liveClients)
-        self.settings.db.sendClientsToAllLivePages()
-
-    def connectionLost(self, reason):
-        log.info("websocket closed")
-        liveClients.remove(self)
-        stats.liveClients = len(liveClients)
-
-    def messageReceived(self, message: bytes):
-        log.info("got message %s" % message)
-        # this is just a leftover test?
-        self.sendMessage(message.decode('utf8'))
-
-
-liveClients: Set[Live] = set()
-stats.liveClients = len(liveClients)
-
-def sendToLiveClients(d: Optional[Dict]=None, asJson: Optional[str]=None):
-    msg: str = asJson or json.dumps(d)
-    assert isinstance(msg, str), repr(msg)
-    for c in liveClients:
-        c.sendMessage(msg)
-
-
 class NoExts(cyclone.web.StaticFileHandler):
     # .html pages can be get() without .html on them
     def get(self, path, *args, **kw):
@@ -480,10 +377,7 @@
     reactor.listenTCP(
         port,
         cyclone.web.Application(handlers=[
-            (r'/live', Live),
             (r'/graph', GraphResource),
-            (r'/patches', Patches),
-            (r'/graphClients', GraphClients),
             (r'/syncedGraph', WebsocketClient),
             (r'/prefixes', Prefixes),
             (r'/stats/(.*)', StatsHandler, {'serverName': 'rdfdb'}),