changeset 129:69ba5b86ce6c

delay file writes a little, so the file io activity doesn't spike with patch rate
author drewp@bigasterisk.com
date Sat, 27 May 2023 19:06:48 -0700
parents bf5353908f3c
children d195a5f50137
files rdfdb/watched_files.py
diffstat 1 files changed, 23 insertions(+), 1 deletions(-) [+]
line wrap: on
line diff
--- a/rdfdb/watched_files.py	Sat May 27 17:55:20 2023 -0700
+++ b/rdfdb/watched_files.py	Sat May 27 19:06:48 2023 -0700
@@ -2,13 +2,14 @@
 import dataclasses
 import logging
 from pathlib import Path
-from typing import Callable, Iterable, Optional, Type, Union
+from typing import Callable, Dict, Iterable, Optional, Type, Union
 
 import asyncinotify
 from asyncinotify import Event, Inotify, Mask
 
 log = logging.getLogger('rdfdb.file')
 
+MAX_FILE_DIRTY_SECS = 1
 
 class FileIsGone:
     pass
@@ -44,18 +45,39 @@
 
         self._notifier = Inotify()
         self.fileEditEvents = asyncio.Queue()
+        self.filesToWrite = asyncio.Queue()
 
         log.info("setup watches")
         for p in dirsToWatch:
             self._watchTree(p)
         self._convertEventsTask = asyncio.create_task(self._convertEvents())
+        self._flushWritesTask = asyncio.create_task(self._flushWrites())
 
     def cancel(self):
         """tests have to call this but i wish they didn't"""
         self._convertEventsTask.cancel()
 
     def writeFile(self, p: Path, content: str):
+        # this will be committed soon
         log.debug(f'writeFile {p.name} len={len(content)}')
+        self.filesToWrite.put_nowait((p, content))
+
+    async def _flushWrites(self):
+        while True:
+            if self.filesToWrite.empty():
+                await asyncio.sleep(MAX_FILE_DIRTY_SECS)
+                continue
+
+            latestContents: Dict[Path, str] = {}
+            while not self.filesToWrite.empty():
+                p, content = self.filesToWrite.get_nowait()
+                latestContents[p] = content
+
+            log.info(f'committing {[p.name for p in latestContents]}')
+            for p, content in latestContents.items():
+                self._commitFile(p, content)
+
+    def _commitFile(self, p: Path, content: str):
         tmpOut = Path(str(p) + ".rdfdb-temp")
         tmpOut.write_text(content)