view rdfdb/watched_files.py @ 109:bc643d61bb7c

format & comments
author drewp@bigasterisk.com
date Mon, 30 May 2022 22:55:20 -0700
parents b0f922c8c728
children be3ee1d50d28
line wrap: on
line source

import asyncio
import dataclasses
import logging
from pathlib import Path
from typing import Callable, Iterable, Optional, Type, Union

import asyncinotify
from asyncinotify import Event, Inotify, Mask

log = logging.getLogger('watchedfiles')


class FileIsGone:
    pass


@dataclasses.dataclass
class FileEditEvent:
    path: Path
    content: Union[str, Type[FileIsGone]]


class WatchedFiles:
    """
    filenames and text content only; yields FileEditEvents

    Notes from a previous version:

        # emacs save comes in as IN_MOVE_SELF, maybe

        # I was hoping not to watch IN_CHANGED and get lots of
        # half-written files, but emacs doesn't close its files after
        # a write, so there's no other event. I could try to sleep
        # until after all the writes are done, but I think the only
        # bug left is that we'll retry too agressively on a file
        # that's being written

    """
    """queue of file contents that the caller should use"""
    fileEditEvents: asyncio.Queue[FileEditEvent]

    def __init__(self, dirsToWatch: Iterable[Path], filenameFilter: Callable[[Path], bool]):
        """watches subtrees of the given dirs"""

        self._notifier = Inotify()
        self.fileEditEvents = asyncio.Queue()

        log.info("setup watches")
        print('watchedfiles')
        for p in dirsToWatch:
            self._watchTree(p)
        self._convertEventsTask = asyncio.create_task(self._convertEvents())

    def cancel(self):
        """tests have to call this but i wish they didn't"""
        self._convertEventsTask.cancel()

    def writeFile(self, p: Path, content: str):

        tmpOut = Path(str(p) + ".rdfdb-temp")
        tmpOut.write_text(content)

        self.lastWriteTimestamp = tmpOut.stat().st_mtime
        tmpOut.rename(p)

        p.write_text(content)
        # and don't trigger event, etc

    ####################################################################################

    def _watchTree(self, top: Path):
        self._watchDir(top)
        for child in top.iterdir():
            if child.is_dir():
                self._watchTree(child)
                continue
            if self._watchablePath(child):
                self._genReadEvent(child)  # initial read for existing file
                self._watchFile(child)
                continue

    def _addWatch(self, p: Path, mask: Mask):
        """quietly no-ops upon an existing matching-inode watch"""
        try:
            self._notifier.add_watch(p, mask | Mask.MASK_CREATE)
        except asyncinotify.InotifyError as e:
            if e.args == ('Call failed, errno 17: File exists',):
                return
            log.error(f"trying to watch {p}:")
            raise

    def _watchFile(self, p: Path):
        log.info(f'watchFile({p})')
        # old code said this:
        #     inFile = correctToTopdirPrefix(self.dirUriMap, inFile)

        return
        self._addWatch(
            p,
            mask=(
                # Mask.ACCESS |  #
                # Mask.MODIFY |  #
                Mask.CLOSE_WRITE |  #
                # Mask.CLOSE_NOWRITE |  #
                Mask.OPEN |  #
                Mask.MOVED_FROM |  #
                Mask.MOVED_TO |  #
                Mask.CREATE |  #
                Mask.DELETE |  #
                Mask.DELETE_SELF |  #
                Mask.MOVE_SELF |  #
                Mask.IGNORED |  #
                0))

    def _watchDir(self, p: Path):
        log.info(f'watchDir({p})')
        assert p.is_dir()
        self._addWatch(
            p,
            mask=(
                # Mask.ACCESS |  #
                # Mask.CLOSE_NOWRITE |  #
                # Mask.IGNORED |  #
                # Mask.MODIFY |  #
                # Mask.MOVE_SELF |  #
                # Mask.OPEN |  #
                Mask.CLOSE_WRITE |  #
                Mask.CREATE |  #
                Mask.DELETE |  #
                Mask.DELETE_SELF |  #
                Mask.ISDIR |  #
                Mask.MOVED_FROM |  #
                Mask.MOVED_TO |  #
                0))

    async def _convertEvents(self):
        """get inotify events, emit FileEditEvents"""
        while True:
            ev = await self._notifier.get()
            self._genEvent(ev, ev.path)

    def _genReadEvent(self, p: Path):
        if not self._watchablePath(p):
            return
        try:
            content = p.read_text()
        except FileNotFoundError:
            log.error(f'while making WatchEvent for file {p}')
            raise
        log.info('--> watchevent with content')
        self.fileEditEvents.put_nowait(FileEditEvent(path=p, content=content))

    def _genGoneEvent(self, p: Path):
        # probably we could reduce this to "a file we tracked is gone" since
        # the rest shouldn't matter. And because they're editor swp files.
        log.info('--> watchevent with FileIsGone')
        self.fileEditEvents.put_nowait(FileEditEvent(path=p, content=FileIsGone))

    def _watchablePath(self, p: Path):
        return p.name.endswith('.n3')

    def _genEvent(self, iev: Event, p: Optional[Path]):
        if p is None:
            raise TypeError
        if p.name.endswith('.rdfdb-temp'):
            return
        log.info(f'ino callback path={str(p)[-30:]!s:30} watchpath={str(iev.path)[-30:]!s:30} {iev.mask!r}')

        if iev.mask == Mask.DELETE_SELF:
            self._genGoneEvent(p)
            if iev.watch is None:
                raise NotImplementedError
            self._notifier.rm_watch(iev.watch)
        elif iev.mask == Mask.CLOSE_WRITE:
            self._genReadEvent(p)
        elif iev.mask == Mask.DELETE:
            self._genGoneEvent(p)
        elif iev.mask == Mask.ISDIR | Mask.CREATE:
            self._watchDir(p)
        elif iev.mask == Mask.MOVED_FROM:
            if self._watchablePath(p):
                self._genGoneEvent(p)
        elif iev.mask == Mask.MOVED_TO:
            if self._watchablePath(p):
                self._genReadEvent(p)