# HG changeset patch # User drewp@bigasterisk.com # Date 1685234070 25200 # Node ID 91b0a82df6e007a65a0cf0beeb32aa4e5b83b0d6 # Parent 6342adc438c59f2f26506ae5466be4700d931988 rewrite SyncedGraph ws error handling diff -r 6342adc438c5 -r 91b0a82df6e0 rdfdb/syncedgraph/syncedgraph_base.py --- a/rdfdb/syncedgraph/syncedgraph_base.py Sat May 27 17:33:11 2023 -0700 +++ b/rdfdb/syncedgraph/syncedgraph_base.py Sat May 27 17:34:30 2023 -0700 @@ -58,6 +58,8 @@ # reactor = cast(IReactorCore, twisted.internet.reactor) +class _CorruptionNeedResync(ValueError): + """we cannot proceed with this session (connection) to the server""" class SyncedGraphBase(object): """ @@ -104,6 +106,12 @@ async def _communicate(self): while True: + await self._runOneServerSession() + log.info("lost connection- retry") + await asyncio.sleep(4) + + async def _runOneServerSession(self): + try: async with aiohttp.ClientSession() as sess: async with sess.ws_connect(self.rdfdbRoot.replace('http://', 'ws://') + 'syncedGraph') as ws: self.ws = ws @@ -113,10 +121,9 @@ except Exception: traceback.print_exc() raise - - await self._lostRdfdbConnection() - log.info("lost connection- retry") - await asyncio.sleep(4) + except _CorruptionNeedResync: + log.warning("_CorruptionNeedResync") + await self._lostRdfdbConnection() async def _onIncomingMsg(self, body: str): j = json.loads(body) @@ -139,26 +146,6 @@ log.info(f'cleared graph to {len(self._graph)} (should be 0)') log.error('graph is not updating- you need to restart') - async def _resync(self): - """ - get the whole graph again from the server (e.g. we had a - conflict while applying a patch and want to return to the - truth). - - To avoid too much churn, we remember our old graph and diff it - against the replacement. This way, our callers only see the - corrections. - - Edits you make during a resync will surely be lost, so I - should just fail them. There should be a notification back to - UIs who want to show that we're doing a resync. - """ - log.info('resync') - await self.ws.close() - - # diff against old entire graph - # broadcast that change - async def runDepsOnNewPatch(self, p): # See AutoDepGraphApi pass @@ -167,7 +154,8 @@ """send this patch to the server and apply it to our local graph and run handlers""" - if not self.isConnected or self.currentClient is None: + if not self.isConnected: + # todo: queue these. it's not the caller's problem log.warn("not currently connected- dropping patch") return @@ -184,8 +172,7 @@ self._applyPatchLocally(p) except ValueError as e: log.error(e) - await self._resync() - return + raise _CorruptionNeedResync() log.debug('runDepsOnNewPatch') await self.runDepsOnNewPatch(p) log.debug('sendPatch') @@ -198,6 +185,7 @@ prefixes. async, not guaranteed to finish before any particular file flush """ + raise NotImplementedError await self.httpSession.post(self.rdfdbRoot + 'prefixes', data=json.dumps({'ctx': ctx, 'prefixes': prefixes})) def _applyPatchLocally(self, p: Patch): @@ -219,8 +207,7 @@ self._applyPatchLocally(p) except ValueError as e: log.error(e) - await self._resync() - return + raise _CorruptionNeedResync() try: await self.runDepsOnNewPatch(p)