# HG changeset patch # User drewp@bigasterisk.com # Date 1685427655 25200 # Node ID d3d6a1f62222cd8621084bf042d08b99b4d326e0 # Parent f11770a0a797a1046806e0c38051fb7f39dc392b try to handle patch apply errors. very untested, but it should log some stuff diff -r f11770a0a797 -r d3d6a1f62222 rdfdb/syncedgraph/syncedgraph_base.py --- a/rdfdb/syncedgraph/syncedgraph_base.py Mon May 29 22:28:55 2023 -0700 +++ b/rdfdb/syncedgraph/syncedgraph_base.py Mon May 29 23:20:55 2023 -0700 @@ -2,7 +2,7 @@ import json import logging import traceback -from typing import Any, Optional, cast +from typing import Any, Callable, Optional, cast import aiohttp from rdfdb.patch import Patch, compactPatches @@ -100,7 +100,7 @@ self.inbox: asyncio.Queue[Patch] = asyncio.Queue() - asyncio.create_task(self._processInbox()) + asyncio.create_task(self._processInbox(self.onApplyErr)) # todo: # AutoDepGraphApi.__init__(self) @@ -108,6 +108,10 @@ # this needs more state to track if we're doing a resync (and # everything has to error or wait) or if we're live + def onApplyErr(self, exc: Exception): + log.warning(f"onApplyErr got {exc!r} - try to reconnect") + asyncio.create_task(self.ws.close()) + async def _communicate(self): while True: await self._runOneServerSession() @@ -214,7 +218,9 @@ log.debug('server has sent us: %s', p) self.inbox.put_nowait(p) - async def _processInbox(self): + async def _processInbox(self, onApplyErr: Callable): + """If there's an error, we keep this task alive and + callback onApplyErr to cause a reconnect""" while True: pending = [await self.inbox.get()] while True: @@ -233,9 +239,9 @@ log.debug('processInbox: applying patch') try: self._applyPatchLocally(p) - except ValueError as e: + except Exception as e: log.error(e) - raise _CorruptionNeedResync() + onApplyErr(e) try: await self.runDepsOnNewPatch(p)