Mercurial > code > home > repos > rdfdb
changeset 133:f11770a0a797
SyncedGraph finally sheds intermediate patches if it's falling behind
author | drewp@bigasterisk.com |
---|---|
date | Mon, 29 May 2023 22:28:55 -0700 |
parents | 453726e6f891 |
children | d3d6a1f62222 |
files | rdfdb/syncedgraph/syncedgraph_base.py |
diffstat | 1 files changed, 41 insertions(+), 18 deletions(-) [+] |
line wrap: on
line diff
--- a/rdfdb/syncedgraph/syncedgraph_base.py Mon May 29 22:26:18 2023 -0700 +++ b/rdfdb/syncedgraph/syncedgraph_base.py Mon May 29 22:28:55 2023 -0700 @@ -5,7 +5,7 @@ from typing import Any, Optional, cast import aiohttp -from rdfdb.patch import Patch +from rdfdb.patch import Patch, compactPatches from rdfdb.rdflibpatch import patchQuads from rdflib import ConjunctiveGraph, URIRef @@ -98,6 +98,10 @@ self.initiallySynced = asyncio.Future() self._graph = ConjunctiveGraph() + self.inbox: asyncio.Queue[Patch] = asyncio.Queue() + + asyncio.create_task(self._processInbox()) + # todo: # AutoDepGraphApi.__init__(self) @@ -144,7 +148,9 @@ log.debug(f'graph has {len(self._graph)} . lets clear it') await self._patchLocally(Patch(delQuads=self._graph.quads())) log.info(f'cleared graph to {len(self._graph)} (should be 0)') - log.error('graph is not updating- you need to restart') + if len(self._graph): + raise ValueError(f"graph should be empty; contains {len(self._graph)}") + log.info(f'ready to get replacement graph contents') async def runDepsOnNewPatch(self, p): # See AutoDepGraphApi @@ -196,7 +202,6 @@ def _applyPatchLocally(self, p: Patch): # .. and disconnect on failure patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True) - log.debug("graph now has %s statements" % len(self._graph)) async def _onPatchFromDb(self, p: Patch): """ @@ -206,19 +211,37 @@ if len(p.addQuads) > 50: log.debug('server has sent us %s', p.shortSummary()) else: - log.debug('server has sent us %s', p) + log.debug('server has sent us: %s', p) + self.inbox.put_nowait(p) + + async def _processInbox(self): + while True: + pending = [await self.inbox.get()] + while True: + try: + pending.append(self.inbox.get_nowait()) + except asyncio.QueueEmpty: + break + + log.debug(f'got {len(pending)} pending patches') - try: - self._applyPatchLocally(p) - except ValueError as e: - log.error(e) - raise _CorruptionNeedResync() - - try: - await self.runDepsOnNewPatch(p) - except Exception: - # don't reflect this error back to the server; we did - # receive its patch correctly. However, we're in a bad - # state since some dependencies may not have rerun - traceback.print_exc() - log.warn("some graph dependencies may not have completely run") + compacted = compactPatches(pending) + if len(compacted) < len(pending): + log.info(f'compacted {len(pending)} down to {len(compacted)}') + + for p in compacted: + log.debug('processInbox: applying patch') + try: + self._applyPatchLocally(p) + except ValueError as e: + log.error(e) + raise _CorruptionNeedResync() + + try: + await self.runDepsOnNewPatch(p) + except Exception: + # don't reflect this error back to the server; we did + # receive its patch correctly. However, we're in a bad + # state since some dependencies may not have rerun + traceback.print_exc() + log.warn("some graph dependencies may not have completely run")