Mercurial > code > home > repos > rdfdb
changeset 125:91b0a82df6e0
rewrite SyncedGraph ws error handling
author | drewp@bigasterisk.com |
---|---|
date | Sat, 27 May 2023 17:34:30 -0700 |
parents | 6342adc438c5 |
children | 1780382477ed |
files | rdfdb/syncedgraph/syncedgraph_base.py |
diffstat | 1 files changed, 16 insertions(+), 29 deletions(-) [+] |
line wrap: on
line diff
--- a/rdfdb/syncedgraph/syncedgraph_base.py Sat May 27 17:33:11 2023 -0700 +++ b/rdfdb/syncedgraph/syncedgraph_base.py Sat May 27 17:34:30 2023 -0700 @@ -58,6 +58,8 @@ # reactor = cast(IReactorCore, twisted.internet.reactor) +class _CorruptionNeedResync(ValueError): + """we cannot proceed with this session (connection) to the server""" class SyncedGraphBase(object): """ @@ -104,6 +106,12 @@ async def _communicate(self): while True: + await self._runOneServerSession() + log.info("lost connection- retry") + await asyncio.sleep(4) + + async def _runOneServerSession(self): + try: async with aiohttp.ClientSession() as sess: async with sess.ws_connect(self.rdfdbRoot.replace('http://', 'ws://') + 'syncedGraph') as ws: self.ws = ws @@ -113,10 +121,9 @@ except Exception: traceback.print_exc() raise - - await self._lostRdfdbConnection() - log.info("lost connection- retry") - await asyncio.sleep(4) + except _CorruptionNeedResync: + log.warning("_CorruptionNeedResync") + await self._lostRdfdbConnection() async def _onIncomingMsg(self, body: str): j = json.loads(body) @@ -139,26 +146,6 @@ log.info(f'cleared graph to {len(self._graph)} (should be 0)') log.error('graph is not updating- you need to restart') - async def _resync(self): - """ - get the whole graph again from the server (e.g. we had a - conflict while applying a patch and want to return to the - truth). - - To avoid too much churn, we remember our old graph and diff it - against the replacement. This way, our callers only see the - corrections. - - Edits you make during a resync will surely be lost, so I - should just fail them. There should be a notification back to - UIs who want to show that we're doing a resync. - """ - log.info('resync') - await self.ws.close() - - # diff against old entire graph - # broadcast that change - async def runDepsOnNewPatch(self, p): # See AutoDepGraphApi pass @@ -167,7 +154,8 @@ """send this patch to the server and apply it to our local graph and run handlers""" - if not self.isConnected or self.currentClient is None: + if not self.isConnected: + # todo: queue these. it's not the caller's problem log.warn("not currently connected- dropping patch") return @@ -184,8 +172,7 @@ self._applyPatchLocally(p) except ValueError as e: log.error(e) - await self._resync() - return + raise _CorruptionNeedResync() log.debug('runDepsOnNewPatch') await self.runDepsOnNewPatch(p) log.debug('sendPatch') @@ -198,6 +185,7 @@ prefixes. async, not guaranteed to finish before any particular file flush """ + raise NotImplementedError await self.httpSession.post(self.rdfdbRoot + 'prefixes', data=json.dumps({'ctx': ctx, 'prefixes': prefixes})) def _applyPatchLocally(self, p: Patch): @@ -219,8 +207,7 @@ self._applyPatchLocally(p) except ValueError as e: log.error(e) - await self._resync() - return + raise _CorruptionNeedResync() try: await self.runDepsOnNewPatch(p)