# HG changeset patch # User drewp@bigasterisk.com # Date 1685239608 25200 # Node ID 69ba5b86ce6cbf0db49902660e2b163ede34618f # Parent bf5353908f3cdd8b7f00d681f10d4f3185017df8 delay file writes a little, so the file io activity doesn't spike with patch rate diff -r bf5353908f3c -r 69ba5b86ce6c rdfdb/watched_files.py --- a/rdfdb/watched_files.py Sat May 27 17:55:20 2023 -0700 +++ b/rdfdb/watched_files.py Sat May 27 19:06:48 2023 -0700 @@ -2,13 +2,14 @@ import dataclasses import logging from pathlib import Path -from typing import Callable, Iterable, Optional, Type, Union +from typing import Callable, Dict, Iterable, Optional, Type, Union import asyncinotify from asyncinotify import Event, Inotify, Mask log = logging.getLogger('rdfdb.file') +MAX_FILE_DIRTY_SECS = 1 class FileIsGone: pass @@ -44,18 +45,39 @@ self._notifier = Inotify() self.fileEditEvents = asyncio.Queue() + self.filesToWrite = asyncio.Queue() log.info("setup watches") for p in dirsToWatch: self._watchTree(p) self._convertEventsTask = asyncio.create_task(self._convertEvents()) + self._flushWritesTask = asyncio.create_task(self._flushWrites()) def cancel(self): """tests have to call this but i wish they didn't""" self._convertEventsTask.cancel() def writeFile(self, p: Path, content: str): + # this will be committed soon log.debug(f'writeFile {p.name} len={len(content)}') + self.filesToWrite.put_nowait((p, content)) + + async def _flushWrites(self): + while True: + if self.filesToWrite.empty(): + await asyncio.sleep(MAX_FILE_DIRTY_SECS) + continue + + latestContents: Dict[Path, str] = {} + while not self.filesToWrite.empty(): + p, content = self.filesToWrite.get_nowait() + latestContents[p] = content + + log.info(f'committing {[p.name for p in latestContents]}') + for p, content in latestContents.items(): + self._commitFile(p, content) + + def _commitFile(self, p: Path, content: str): tmpOut = Path(str(p) + ".rdfdb-temp") tmpOut.write_text(content)