changeset 134:d3d6a1f62222

try to handle patch apply errors. very untested, but it should log some stuff
author drewp@bigasterisk.com
date Mon, 29 May 2023 23:20:55 -0700
parents f11770a0a797
children 605ea6ce409e
files rdfdb/syncedgraph/syncedgraph_base.py
diffstat 1 files changed, 11 insertions(+), 5 deletions(-) [+]
line wrap: on
line diff
--- a/rdfdb/syncedgraph/syncedgraph_base.py	Mon May 29 22:28:55 2023 -0700
+++ b/rdfdb/syncedgraph/syncedgraph_base.py	Mon May 29 23:20:55 2023 -0700
@@ -2,7 +2,7 @@
 import json
 import logging
 import traceback
-from typing import Any, Optional, cast
+from typing import Any, Callable, Optional, cast
 
 import aiohttp
 from rdfdb.patch import Patch, compactPatches
@@ -100,7 +100,7 @@
 
         self.inbox: asyncio.Queue[Patch] = asyncio.Queue()
 
-        asyncio.create_task(self._processInbox())
+        asyncio.create_task(self._processInbox(self.onApplyErr))
 
         # todo:
         # AutoDepGraphApi.__init__(self)
@@ -108,6 +108,10 @@
         # this needs more state to track if we're doing a resync (and
         # everything has to error or wait) or if we're live
 
+    def onApplyErr(self, exc: Exception):
+        log.warning(f"onApplyErr got {exc!r} - try to reconnect")
+        asyncio.create_task(self.ws.close())
+
     async def _communicate(self):
         while True:
             await self._runOneServerSession()
@@ -214,7 +218,9 @@
                 log.debug('server has sent us: %s', p)
         self.inbox.put_nowait(p)
 
-    async def _processInbox(self):
+    async def _processInbox(self, onApplyErr: Callable):
+        """If there's an error, we keep this task alive and 
+        callback onApplyErr to cause a reconnect"""
         while True:
             pending = [await self.inbox.get()]
             while True:
@@ -233,9 +239,9 @@
                 log.debug('processInbox: applying patch')
                 try:
                     self._applyPatchLocally(p)
-                except ValueError as e:
+                except Exception as e:
                     log.error(e)
-                    raise _CorruptionNeedResync()
+                    onApplyErr(e)
                 
                 try:
                     await self.runDepsOnNewPatch(p)