Mercurial > code > home > repos > rdfdb
view rdfdb/watched_files.py @ 118:418050704a42
fix tests
author | drewp@bigasterisk.com |
---|---|
date | Tue, 23 May 2023 19:43:57 -0700 |
parents | be3ee1d50d28 |
children | 3733efe1fd19 |
line wrap: on
line source
import asyncio import dataclasses import logging from pathlib import Path from typing import Callable, Iterable, Optional, Type, Union import asyncinotify from asyncinotify import Event, Inotify, Mask log = logging.getLogger('watchedfiles') class FileIsGone: pass @dataclasses.dataclass class FileEditEvent: path: Path content: Union[str, Type[FileIsGone]] class WatchedFiles: """ filenames and text content only; yields FileEditEvents Notes from a previous version: # emacs save comes in as IN_MOVE_SELF, maybe # I was hoping not to watch IN_CHANGED and get lots of # half-written files, but emacs doesn't close its files after # a write, so there's no other event. I could try to sleep # until after all the writes are done, but I think the only # bug left is that we'll retry too agressively on a file # that's being written """ """queue of file contents that the caller should use""" fileEditEvents: asyncio.Queue[FileEditEvent] def __init__(self, dirsToWatch: Iterable[Path], filenameFilter: Callable[[Path], bool]): """watches subtrees of the given dirs""" self._notifier = Inotify() self.fileEditEvents = asyncio.Queue() log.info("setup watches") for p in dirsToWatch: self._watchTree(p) self._convertEventsTask = asyncio.create_task(self._convertEvents()) def cancel(self): """tests have to call this but i wish they didn't""" self._convertEventsTask.cancel() def writeFile(self, p: Path, content: str): tmpOut = Path(str(p) + ".rdfdb-temp") tmpOut.write_text(content) self.lastWriteTimestamp = tmpOut.stat().st_mtime tmpOut.rename(p) p.write_text(content) # and don't trigger event, etc #################################################################################### def _watchTree(self, top: Path): self._watchDir(top) for child in top.iterdir(): if child.is_dir(): self._watchTree(child) continue if self._watchablePath(child): self._genReadEvent(child) # initial read for existing file self._watchFile(child) continue def _addWatch(self, p: Path, mask: Mask): """quietly no-ops upon an existing matching-inode watch""" try: self._notifier.add_watch(p, mask | Mask.MASK_CREATE) except asyncinotify.InotifyError as e: if e.args == ('Call failed, errno 17: File exists',): return log.error(f"trying to watch {p}:") raise def _watchFile(self, p: Path): log.info(f'watchFile({p})') # old code said this: # inFile = correctToTopdirPrefix(self.dirUriMap, inFile) return self._addWatch( p, mask=( # Mask.ACCESS | # # Mask.MODIFY | # Mask.CLOSE_WRITE | # # Mask.CLOSE_NOWRITE | # Mask.OPEN | # Mask.MOVED_FROM | # Mask.MOVED_TO | # Mask.CREATE | # Mask.DELETE | # Mask.DELETE_SELF | # Mask.MOVE_SELF | # Mask.IGNORED | # 0)) def _watchDir(self, p: Path): log.info(f'watchDir({p})') assert p.is_dir() self._addWatch( p, mask=( # Mask.ACCESS | # # Mask.CLOSE_NOWRITE | # # Mask.IGNORED | # # Mask.MODIFY | # # Mask.MOVE_SELF | # # Mask.OPEN | # Mask.CLOSE_WRITE | # Mask.CREATE | # Mask.DELETE | # Mask.DELETE_SELF | # Mask.ISDIR | # Mask.MOVED_FROM | # Mask.MOVED_TO | # 0)) async def _convertEvents(self): """get inotify events, emit FileEditEvents""" while True: ev = await self._notifier.get() self._genEvent(ev, ev.path) def _genReadEvent(self, p: Path): if not self._watchablePath(p): return try: content = p.read_text() except FileNotFoundError: log.error(f'while making WatchEvent for file {p}') raise log.info(f'--> watchevent on {p.name} with content') # todo: i now think this is the point we should realize that the content # is the same as what we 'recently' wrote to the file, and the caller # should not be bothered with an event. What is 'recently'? If the # file mtime matches our last write? self.fileEditEvents.put_nowait(FileEditEvent(path=p, content=content)) def _genGoneEvent(self, p: Path): # probably we could reduce this to "a file we tracked is gone" since # the rest shouldn't matter. And because they're editor swp files. log.info('--> watchevent with FileIsGone') self.fileEditEvents.put_nowait(FileEditEvent(path=p, content=FileIsGone)) def _watchablePath(self, p: Path): return p.name.endswith('.n3') def _genEvent(self, iev: Event, p: Optional[Path]): if p is None: raise TypeError if p.name.endswith('.rdfdb-temp'): return log.info(f'ino callback path={str(p)[-30:]!s:30} watchpath={str(iev.path)[-30:]!s:30} {iev.mask!r}') if iev.mask == Mask.DELETE_SELF: self._genGoneEvent(p) if iev.watch is None: raise NotImplementedError self._notifier.rm_watch(iev.watch) elif iev.mask == Mask.CLOSE_WRITE: self._genReadEvent(p) elif iev.mask == Mask.DELETE: self._genGoneEvent(p) elif iev.mask == Mask.ISDIR | Mask.CREATE: self._watchDir(p) elif iev.mask == Mask.MOVED_FROM: if self._watchablePath(p): self._genGoneEvent(p) elif iev.mask == Mask.MOVED_TO: if self._watchablePath(p): self._genReadEvent(p)