changeset 125:91b0a82df6e0

rewrite SyncedGraph ws error handling
author drewp@bigasterisk.com
date Sat, 27 May 2023 17:34:30 -0700
parents 6342adc438c5
children 1780382477ed
files rdfdb/syncedgraph/syncedgraph_base.py
diffstat 1 files changed, 16 insertions(+), 29 deletions(-) [+]
line wrap: on
line diff
--- a/rdfdb/syncedgraph/syncedgraph_base.py	Sat May 27 17:33:11 2023 -0700
+++ b/rdfdb/syncedgraph/syncedgraph_base.py	Sat May 27 17:34:30 2023 -0700
@@ -58,6 +58,8 @@
 
 # reactor = cast(IReactorCore, twisted.internet.reactor)
 
+class _CorruptionNeedResync(ValueError):
+    """we cannot proceed with this session (connection) to the server"""
 
 class SyncedGraphBase(object):
     """
@@ -104,6 +106,12 @@
 
     async def _communicate(self):
         while True:
+            await self._runOneServerSession()
+            log.info("lost connection- retry")
+            await asyncio.sleep(4)
+
+    async def _runOneServerSession(self):
+        try:
             async with aiohttp.ClientSession() as sess:
                 async with sess.ws_connect(self.rdfdbRoot.replace('http://', 'ws://') + 'syncedGraph') as ws:
                     self.ws = ws
@@ -113,10 +121,9 @@
                         except Exception:
                             traceback.print_exc()
                             raise
-
-            await self._lostRdfdbConnection()
-            log.info("lost connection- retry")
-            await asyncio.sleep(4)
+        except _CorruptionNeedResync:
+            log.warning("_CorruptionNeedResync")
+        await self._lostRdfdbConnection()
 
     async def _onIncomingMsg(self, body: str):
         j = json.loads(body)
@@ -139,26 +146,6 @@
         log.info(f'cleared graph to {len(self._graph)} (should be 0)')
         log.error('graph is not updating- you need to restart')
 
-    async def _resync(self):
-        """
-        get the whole graph again from the server (e.g. we had a
-        conflict while applying a patch and want to return to the
-        truth).
-
-        To avoid too much churn, we remember our old graph and diff it
-        against the replacement. This way, our callers only see the
-        corrections.
-
-        Edits you make during a resync will surely be lost, so I
-        should just fail them. There should be a notification back to
-        UIs who want to show that we're doing a resync.
-        """
-        log.info('resync')
-        await self.ws.close()
-
-        # diff against old entire graph
-        # broadcast that change
-
     async def runDepsOnNewPatch(self, p):
         # See AutoDepGraphApi
         pass
@@ -167,7 +154,8 @@
         """send this patch to the server and apply it to our local
         graph and run handlers"""
 
-        if not self.isConnected or self.currentClient is None:
+        if not self.isConnected:
+            # todo: queue these. it's not the caller's problem
             log.warn("not currently connected- dropping patch")
             return
 
@@ -184,8 +172,7 @@
             self._applyPatchLocally(p)
         except ValueError as e:
             log.error(e)
-            await self._resync()
-            return
+            raise _CorruptionNeedResync()
         log.debug('runDepsOnNewPatch')
         await self.runDepsOnNewPatch(p)
         log.debug('sendPatch')
@@ -198,6 +185,7 @@
         prefixes. async, not guaranteed to finish before any
         particular file flush
         """
+        raise NotImplementedError
         await self.httpSession.post(self.rdfdbRoot + 'prefixes', data=json.dumps({'ctx': ctx, 'prefixes': prefixes}))
 
     def _applyPatchLocally(self, p: Patch):
@@ -219,8 +207,7 @@
             self._applyPatchLocally(p)
         except ValueError as e:
             log.error(e)
-            await self._resync()
-            return
+            raise _CorruptionNeedResync()
         
         try:
             await self.runDepsOnNewPatch(p)