Mercurial > code > home > repos > rdfdb
changeset 129:69ba5b86ce6c
delay file writes a little, so the file io activity doesn't spike with patch rate
author | drewp@bigasterisk.com |
---|---|
date | Sat, 27 May 2023 19:06:48 -0700 |
parents | bf5353908f3c |
children | d195a5f50137 |
files | rdfdb/watched_files.py |
diffstat | 1 files changed, 23 insertions(+), 1 deletions(-) [+] |
line wrap: on
line diff
--- a/rdfdb/watched_files.py Sat May 27 17:55:20 2023 -0700 +++ b/rdfdb/watched_files.py Sat May 27 19:06:48 2023 -0700 @@ -2,13 +2,14 @@ import dataclasses import logging from pathlib import Path -from typing import Callable, Iterable, Optional, Type, Union +from typing import Callable, Dict, Iterable, Optional, Type, Union import asyncinotify from asyncinotify import Event, Inotify, Mask log = logging.getLogger('rdfdb.file') +MAX_FILE_DIRTY_SECS = 1 class FileIsGone: pass @@ -44,18 +45,39 @@ self._notifier = Inotify() self.fileEditEvents = asyncio.Queue() + self.filesToWrite = asyncio.Queue() log.info("setup watches") for p in dirsToWatch: self._watchTree(p) self._convertEventsTask = asyncio.create_task(self._convertEvents()) + self._flushWritesTask = asyncio.create_task(self._flushWrites()) def cancel(self): """tests have to call this but i wish they didn't""" self._convertEventsTask.cancel() def writeFile(self, p: Path, content: str): + # this will be committed soon log.debug(f'writeFile {p.name} len={len(content)}') + self.filesToWrite.put_nowait((p, content)) + + async def _flushWrites(self): + while True: + if self.filesToWrite.empty(): + await asyncio.sleep(MAX_FILE_DIRTY_SECS) + continue + + latestContents: Dict[Path, str] = {} + while not self.filesToWrite.empty(): + p, content = self.filesToWrite.get_nowait() + latestContents[p] = content + + log.info(f'committing {[p.name for p in latestContents]}') + for p, content in latestContents.items(): + self._commitFile(p, content) + + def _commitFile(self, p: Path, content: str): tmpOut = Path(str(p) + ".rdfdb-temp") tmpOut.write_text(content)