changeset 133:f11770a0a797

SyncedGraph finally sheds intermediate patches if it's falling behind
author drewp@bigasterisk.com
date Mon, 29 May 2023 22:28:55 -0700
parents 453726e6f891
children d3d6a1f62222
files rdfdb/syncedgraph/syncedgraph_base.py
diffstat 1 files changed, 41 insertions(+), 18 deletions(-) [+]
line wrap: on
line diff
--- a/rdfdb/syncedgraph/syncedgraph_base.py	Mon May 29 22:26:18 2023 -0700
+++ b/rdfdb/syncedgraph/syncedgraph_base.py	Mon May 29 22:28:55 2023 -0700
@@ -5,7 +5,7 @@
 from typing import Any, Optional, cast
 
 import aiohttp
-from rdfdb.patch import Patch
+from rdfdb.patch import Patch, compactPatches
 from rdfdb.rdflibpatch import patchQuads
 from rdflib import ConjunctiveGraph, URIRef
 
@@ -98,6 +98,10 @@
         self.initiallySynced = asyncio.Future()
         self._graph = ConjunctiveGraph()
 
+        self.inbox: asyncio.Queue[Patch] = asyncio.Queue()
+
+        asyncio.create_task(self._processInbox())
+
         # todo:
         # AutoDepGraphApi.__init__(self)
 
@@ -144,7 +148,9 @@
         log.debug(f'graph has {len(self._graph)} . lets clear it')
         await self._patchLocally(Patch(delQuads=self._graph.quads()))
         log.info(f'cleared graph to {len(self._graph)} (should be 0)')
-        log.error('graph is not updating- you need to restart')
+        if len(self._graph):
+            raise ValueError(f"graph should be empty; contains {len(self._graph)}")
+        log.info(f'ready to get replacement graph contents')
 
     async def runDepsOnNewPatch(self, p):
         # See AutoDepGraphApi
@@ -196,7 +202,6 @@
     def _applyPatchLocally(self, p: Patch):
         # .. and disconnect on failure
         patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
-        log.debug("graph now has %s statements" % len(self._graph))
 
     async def _onPatchFromDb(self, p: Patch):
         """
@@ -206,19 +211,37 @@
             if len(p.addQuads) > 50:
                 log.debug('server has sent us %s', p.shortSummary())
             else:
-                log.debug('server has sent us %s', p)
+                log.debug('server has sent us: %s', p)
+        self.inbox.put_nowait(p)
+
+    async def _processInbox(self):
+        while True:
+            pending = [await self.inbox.get()]
+            while True:
+                try:
+                    pending.append(self.inbox.get_nowait())
+                except asyncio.QueueEmpty:
+                    break
+
+            log.debug(f'got {len(pending)} pending patches')
 
-        try:
-            self._applyPatchLocally(p)
-        except ValueError as e:
-            log.error(e)
-            raise _CorruptionNeedResync()
-        
-        try:
-            await self.runDepsOnNewPatch(p)
-        except Exception:
-            # don't reflect this error back to the server; we did
-            # receive its patch correctly. However, we're in a bad
-            # state since some dependencies may not have rerun
-            traceback.print_exc()
-            log.warn("some graph dependencies may not have completely run")
+            compacted = compactPatches(pending)
+            if len(compacted) < len(pending):
+                log.info(f'compacted {len(pending)} down to {len(compacted)}')
+
+            for p in compacted:
+                log.debug('processInbox: applying patch')
+                try:
+                    self._applyPatchLocally(p)
+                except ValueError as e:
+                    log.error(e)
+                    raise _CorruptionNeedResync()
+                
+                try:
+                    await self.runDepsOnNewPatch(p)
+                except Exception:
+                    # don't reflect this error back to the server; we did
+                    # receive its patch correctly. However, we're in a bad
+                    # state since some dependencies may not have rerun
+                    traceback.print_exc()
+                    log.warn("some graph dependencies may not have completely run")