changeset 99:22f81cb04da4

WIP graph_file replaced with two layers: text files with paths and change monitoring, and a graph layer over that
author drewp@bigasterisk.com
date Mon, 30 May 2022 20:28:17 -0700
parents 32e2c91eeaf3
children 27dcc13f9958
files rdfdb/graphfile.py rdfdb/graphfile_test.py rdfdb/watched_files.py rdfdb/watched_files_test.py rdfdb/watched_graphs.py rdfdb/watched_graphs_test.py
diffstat 6 files changed, 608 insertions(+), 44 deletions(-) [+]
line wrap: on
line diff
--- a/rdfdb/graphfile.py	Mon May 30 20:39:59 2022 -0700
+++ b/rdfdb/graphfile.py	Mon May 30 20:28:17 2022 -0700
@@ -15,6 +15,7 @@
 from rdfdb.patch import Patch
 from rdfdb.rdflibpatch import inContext
 
+raise NotImplementedError('deleteme')
 reactor = cast(IReactorCore, twisted.internet.reactor)
 
 log = logging.getLogger('graphfile')
--- a/rdfdb/graphfile_test.py	Mon May 30 20:39:59 2022 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,44 +0,0 @@
-import tempfile
-import unittest
-from pathlib import Path
-from typing import cast
-
-import mock
-from _pytest.assertion import truncate
-from rdflib import Graph, URIRef
-from twisted.internet.inotify import INotify
-
-from rdfdb.graphfile import GraphFile
-
-truncate.DEFAULT_MAX_LINES = 9999
-truncate.DEFAULT_MAX_CHARS = 9999
-
-
-class TestGraphFileOutput(unittest.TestCase):
-
-    def testMaintainsN3PrefixesFromInput(self):
-        tf = tempfile.NamedTemporaryFile(suffix='_test.n3')
-        tf.write(b'''
-        @prefix : <http://example.com/> .
-        @prefix n: <http://example.com/n/> .
-        :foo n:bar :baz .
-        ''')
-        tf.flush()
-
-        def getSubgraph(uri):
-            return Graph()
-
-        gf = GraphFile(cast(INotify, mock.Mock()), Path(tf.name), URIRef('uri'), mock.Mock(), getSubgraph, {}, {})
-        gf.reread()
-
-        newGraph = Graph()
-        newGraph.add((URIRef('http://example.com/boo'), URIRef('http://example.com/n/two'), URIRef('http://example.com/other/ns')))
-        gf.dirty(newGraph)
-        gf.flush()
-        wroteContent = open(tf.name, 'rb').read()
-        print(wroteContent)
-        self.assertEqual(b'''@prefix : <http://example.com/> .
-@prefix n: <http://example.com/n/> .
-
-:boo n:two <http://example.com/other/ns> .
-''', wroteContent)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rdfdb/watched_files.py	Mon May 30 20:28:17 2022 -0700
@@ -0,0 +1,173 @@
+import asyncio
+import dataclasses
+import logging
+from pathlib import Path
+from typing import Callable, Iterable, Optional, Type, Union
+
+import asyncinotify
+from asyncinotify import Event, Inotify, Mask
+
+log = logging.getLogger('watchedfiles')
+
+
+class FileIsGone:
+    pass
+
+
+@dataclasses.dataclass
+class FileEditEvent:
+    path: Path
+    content: Union[str, Type[FileIsGone]]
+
+
+class WatchedFiles:
+    """
+    filenames and text content only; yields FileEditEvents
+    """
+    """queue of file contents that the caller should use"""
+    fileEditEvents: asyncio.Queue[FileEditEvent]
+
+    def __init__(self, dirsToWatch: Iterable[Path], filenameFilter: Callable[[Path], bool]):
+        """watches subtrees of the given dirs"""
+
+        self._notifier = Inotify()
+        self.fileEditEvents = asyncio.Queue()
+
+        log.info("setup watches")
+        print('watchedfiles')
+        for p in dirsToWatch:
+            self._watchTree(p)
+        self._convertEventsTask = asyncio.create_task(self._convertEvents())
+
+    def cancel(self):
+        """tests have to call this but i wish they didn't"""
+        self._convertEventsTask.cancel()
+
+    def writeFile(self, p: Path, content: str):
+
+        tmpOut = Path(str(p) + ".rdfdb-temp")
+        tmpOut.write_text(content)
+
+        self.lastWriteTimestamp = tmpOut.stat().st_mtime
+        tmpOut.rename(p)
+
+        p.write_text(content)
+        # and don't trigger event, etc
+
+    ####################################################################################
+
+    def _watchTree(self, top: Path):
+        self._watchDir(top)
+        for child in top.iterdir():
+            if child.is_dir():
+                self._watchTree(child)
+                continue
+            if self._watchablePath(child):
+                self._genReadEvent(child)  # initial read for existing file
+                self._watchFile(child)
+                continue
+
+    def _addWatch(self, p: Path, mask: Mask):
+        """quietly no-ops upon an existing matching-inode watch"""
+        try:
+            self._notifier.add_watch(p, mask | Mask.MASK_CREATE)
+        except asyncinotify.InotifyError as e:
+            if e.args == ('Call failed, errno 17: File exists',):
+                return
+            log.error(f"trying to watch {p}:")
+            raise
+
+    def _watchFile(self, p: Path):
+        log.info(f'watchFile({p})')
+        # old code said this:
+        #     inFile = correctToTopdirPrefix(self.dirUriMap, inFile)
+
+        return
+        self._addWatch(
+            p,
+            mask=(
+                # Mask.ACCESS |  #
+                # Mask.MODIFY |  #
+                Mask.CLOSE_WRITE |  #
+                # Mask.CLOSE_NOWRITE |  #
+                Mask.OPEN |  #
+                Mask.MOVED_FROM |  #
+                Mask.MOVED_TO |  #
+                Mask.CREATE |  #
+                Mask.DELETE |  #
+                Mask.DELETE_SELF |  #
+                Mask.MOVE_SELF |  #
+                Mask.IGNORED |  #
+                0))
+
+    def _watchDir(self, p: Path):
+        log.info(f'watchDir({p})')
+        assert p.is_dir()
+        self._addWatch(
+            p,
+            mask=(
+                # Mask.ACCESS |  #
+                # Mask.CLOSE_NOWRITE |  #
+                # Mask.IGNORED |  #
+                # Mask.MODIFY |  #
+                # Mask.MOVE_SELF |  #
+                # Mask.OPEN |  #
+                Mask.CLOSE_WRITE |  #
+                Mask.CREATE |  #
+                Mask.DELETE |  #
+                Mask.DELETE_SELF |  #
+                Mask.ISDIR |  #
+                Mask.MOVED_FROM |  #
+                Mask.MOVED_TO |  #
+                0))
+
+    async def _convertEvents(self):
+        """get inotify events, emit FileEditEvents"""
+        while True:
+            ev = await self._notifier.get()
+            # log.info(f'got notifier event {ev.path.name if ev.path else None} {ev.mask}')
+
+            self._genEvent(ev, ev.path)
+
+    def _genReadEvent(self, p: Path):
+        if not self._watchablePath(p):
+            return
+        try:
+            content = p.read_text()
+        except FileNotFoundError:
+            log.error(f'while making WatchEvent for file {p}')
+            raise
+        log.info('--> watchevent with content')
+        self.fileEditEvents.put_nowait(FileEditEvent(path=p, content=content))
+
+    def _genGoneEvent(self, p: Path):
+        log.info('--> watchevent with FileIsGone')
+        self.fileEditEvents.put_nowait(FileEditEvent(path=p, content=FileIsGone))
+
+    def _watchablePath(self, p: Path):
+        return p.name.endswith('.n3')
+
+    def _genEvent(self, iev: Event, p: Optional[Path]):
+        if p is None:
+            raise TypeError
+        if p.name.endswith('.rdfdb-temp'):
+            return
+        log.info(f'ino callback path={str(p)[-30:]!s:30} watchpath={str(iev.path)[-30:]!s:30} {iev.mask!r}')
+
+        if iev.mask == Mask.DELETE_SELF:
+            self._genGoneEvent(p)
+            if iev.watch is None:
+                raise NotImplementedError
+            self._notifier.rm_watch(iev.watch)
+        elif iev.mask == Mask.CLOSE_WRITE:
+            self._genReadEvent(p)
+        elif iev.mask == Mask.DELETE:
+            self._genGoneEvent(p)
+        elif iev.mask == Mask.ISDIR | Mask.CREATE:
+            self._watchDir(p)
+        elif iev.mask == Mask.MOVED_FROM:
+            if self._watchablePath(p):
+                self._genGoneEvent(p)
+        elif iev.mask == Mask.MOVED_TO:
+            if self._watchablePath(p):
+                self._genReadEvent(p)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rdfdb/watched_files_test.py	Mon May 30 20:28:17 2022 -0700
@@ -0,0 +1,246 @@
+import asyncio
+import logging
+from pathlib import Path
+import pytest
+
+from .watched_files import FileIsGone, WatchedFiles, FileEditEvent
+log = logging.getLogger()
+
+
+async def assert_no_events_coming(q: asyncio.Queue):
+    """
+    see if inotify makes an event or stays quiet
+    """
+    inotify_max_wait_secs = 0.1
+    with pytest.raises(asyncio.TimeoutError):
+        # no events from the empty dir
+        ev = await asyncio.wait_for(q.get(), timeout=inotify_max_wait_secs)
+        log.error(f'expected no events; got {ev}', stacklevel=2)
+
+
+@pytest.mark.asyncio
+async def test_notice_file_exists(tmpdir):
+    p = Path(tmpdir.mkdir("root"))
+    (p / "hello.n3").write_text("# some n3")
+    w = WatchedFiles([Path(p)], filenameFilter=lambda x: True)
+
+    ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
+    assert ev == FileEditEvent(path=p / "hello.n3", content="# some n3")
+
+    await assert_no_events_coming(w.fileEditEvents)
+
+    w.cancel()
+
+# @pytest.mark.asyncio
+# async def test_ignore_other_extensions(tmpdir):
+#     p = tmpdir.mkdir("root")
+#     p.join("hello.txt").write("# some other text")
+#     w = WatchedFiles({Path(p): EX})
+
+#     await assert_no_events_coming(w.fileEditEvents)
+
+# @pytest.mark.asyncio
+# async def test_notice_file_rewrite(tmpdir):
+#     root = tmpdir.mkdir("root")
+#     someFile = root / 'hello.n3'
+#     someFile.write("# some n3")
+
+#     w = WatchedFiles({Path(root): EX})
+
+#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
+#     assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content="# some n3")
+
+#     log.info('new content write')
+#     someFile.write("# new content")
+
+#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
+#     assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content="# new content")
+
+#     await assert_no_events_coming(w.fileEditEvents)
+
+# @pytest.mark.asyncio
+# async def test_notice_file_create(tmpdir):
+#     root = tmpdir.mkdir("root")
+
+#     w = WatchedFiles({Path(root): EX})
+
+#     someFile = root / 'hello.n3'
+#     someFile.write("# some n3")
+
+#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
+#     assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content="# some n3")
+
+#     await assert_no_events_coming(w.fileEditEvents)
+
+# @pytest.mark.asyncio
+# async def test_notice_file_delete(tmpdir):
+#     someFile = Path(tmpdir) / 'hello.n3'
+#     someFile.write_text("# some n3")
+
+#     w = WatchedFiles({Path(tmpdir): EX})
+
+#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
+#     assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content="# some n3")
+
+#     someFile.unlink()
+
+#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
+#     assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content=FileIsGone)
+
+#     await assert_no_events_coming(w.fileEditEvents)
+
+# ##########
+# # subdir
+
+# @pytest.mark.asyncio
+# async def test_notice_subdir_file_exists(tmpdir):
+#     sub1 = Path(tmpdir) / 'sub1'
+#     sub1.mkdir()
+#     someFile = sub1 / 'hello.n3'
+#     someFile.write_text('# some n3')
+
+#     w = WatchedFiles({Path(tmpdir): EX})
+
+#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
+#     assert ev == FileEditEvent(uri=URIRef(EX + 'sub1/hello'), content="# some n3")
+
+#     await assert_no_events_coming(w.fileEditEvents)
+
+# @pytest.mark.asyncio
+# async def test_notice_subdir_file_create(tmpdir):
+#     sub1 = Path(tmpdir) / 'sub1'
+#     sub1.mkdir()
+
+#     w = WatchedFiles({Path(tmpdir): EX})
+
+#     someFile = sub1 / 'hello.n3'
+#     someFile.write_text('# some n3')
+
+#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
+#     assert ev == FileEditEvent(uri=URIRef(EX + 'sub1/hello'), content="# some n3")
+
+#     await assert_no_events_coming(w.fileEditEvents)
+
+# @pytest.mark.asyncio
+# async def test_notice_subdir_create_and_file_create(tmpdir):
+#     w = WatchedFiles({Path(tmpdir): EX})
+
+#     sub1 = Path(tmpdir) / 'sub1'
+#     sub1.mkdir()
+
+#     await assert_no_events_coming(w.fileEditEvents)
+
+#     someFile = sub1 / 'hello.n3'
+#     someFile.write_text('# some n3')
+
+#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
+#     assert ev == FileEditEvent(uri=URIRef(EX + 'sub1/hello'), content="# some n3")
+
+#     await assert_no_events_coming(w.fileEditEvents)
+
+# #############
+# # moves
+
+# @pytest.mark.asyncio
+# async def test_file_move_looks_like_delete_then_create(tmpdir):
+#     root = Path(tmpdir)
+#     (root / "oldname.n3").write_text("# some n3")
+
+#     w = WatchedFiles({root: EX})
+#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
+#     assert ev == FileEditEvent(uri=URIRef(EX + 'oldname'), content="# some n3")
+
+#     (root / "oldname.n3").rename(root / "newname.n3")
+
+#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
+#     assert ev == FileEditEvent(uri=URIRef(EX + 'oldname'), content=FileIsGone)
+
+#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
+#     assert ev == FileEditEvent(uri=URIRef(EX + 'newname'), content="# some n3")
+
+#     await assert_no_events_coming(w.fileEditEvents)
+
+# @pytest.mark.asyncio
+# async def test_move_to_other_extension_looks_like_delete(tmpdir):
+#     root = Path(tmpdir)
+#     (root / "oldname.n3").write_text("# some n3")
+
+#     w = WatchedFiles({root: EX})
+#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
+#     assert ev == FileEditEvent(uri=URIRef(EX + 'oldname'), content="# some n3")
+
+#     (root / "oldname.n3").rename(root / "newname.txt")
+
+#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
+#     assert ev == FileEditEvent(uri=URIRef(EX + 'oldname'), content=FileIsGone)
+
+#     await assert_no_events_coming(w.fileEditEvents)
+
+# @pytest.mark.asyncio
+# async def test_move_to_matched_extension_looks_like_create(tmpdir):
+#     root = Path(tmpdir)
+#     (root / "oldname.txt").write_text("# some data")
+
+#     w = WatchedFiles({root: EX})
+#     await assert_no_events_coming(w.fileEditEvents)
+
+#     (root / "oldname.txt").rename(root / "newname.n3")
+
+#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
+#     assert ev == FileEditEvent(uri=URIRef(EX + 'newname'), content="# some data")
+
+#     await assert_no_events_coming(w.fileEditEvents)
+
+# @pytest.mark.asyncio
+# async def test_move_overwrites_existing_file_like_an_editor_might_save(tmpdir):
+#     root = Path(tmpdir)
+#     (root / "hello.n3").write_text("# some data")
+
+#     w = WatchedFiles({root: EX})
+#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
+#     assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content="# some data")
+
+#     (root / "hello.n3.new").write_text("# some new data")
+#     (root / "hello.n3.new").rename(root / "hello.n3")
+
+#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
+#     assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content="# some new data")
+
+#     await assert_no_events_coming(w.fileEditEvents)
+
+# ##############
+# # writeToFile
+
+# @pytest.mark.asyncio
+# async def test_write_new_file(tmpdir):
+#     root = Path(tmpdir)
+
+#     w = WatchedFiles({root: EX})
+#     w.writeGraph(EX + 'hello', '# some n3')
+
+#     await assert_no_events_coming(w.fileEditEvents)
+
+#     assert (root / 'hello.n3').read_text() == "# some n3"
+
+# @pytest.mark.asyncio
+# async def test_write_new_file_in_new_dirs(tmpdir):
+#     root = Path(tmpdir)
+
+#     w = WatchedFiles({root: EX})
+#     w.writeGraph(EX + 'a/b/hello', '# some n3')
+
+#     await assert_no_events_coming(w.fileEditEvents)
+
+#     assert (root / 'a/b/hello.n3').read_text() == "# some n3"
+
+# @pytest.mark.asyncio
+# async def test_write_over_existing_file(tmpdir):
+#     root = Path(tmpdir)
+#     (root / "hello.n3").write_text("# some n3")
+
+#     w = WatchedFiles({root: EX})
+#     w.writeGraph(EX + 'hello', '# some new n3')
+
+#     await assert_no_events_coming(w.fileEditEvents)
+
+#     assert (root / 'hello.n3').read_text() == "# some new n3"
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rdfdb/watched_graphs.py	Mon May 30 20:28:17 2022 -0700
@@ -0,0 +1,149 @@
+import asyncio
+import dataclasses
+import logging
+from pathlib import Path
+import traceback
+from typing import Dict, Iterable, Tuple, cast
+
+from rdflib import Graph, URIRef
+
+from rdfdb.compact_turtle import patchN3SerializerToUseLessWhitespace
+from rdfdb.file_vs_uri import DirUriMap, fileForUri, uriFromFile
+from rdfdb.patch import ALLSTMTS
+from rdfdb.watched_files import FileIsGone, WatchedFiles
+
+patchN3SerializerToUseLessWhitespace()
+
+log = logging.getLogger('watchedgraphs')
+
+
+@dataclasses.dataclass
+class GraphEditEvent:
+    uri: URIRef
+    content: Iterable[Tuple]  # replace subgraph with these stmts
+
+
+class WatchedGraphs:
+    """
+    performs file<->uri mapping; parses/writes fiels; yields GraphEvents.
+    Includes throttling for quick graph updates.
+    """
+
+    def __init__(self, dirUriMap: DirUriMap, addlPrefixes: Dict[str, URIRef]):
+        self._dirUriMap = dirUriMap
+        self.addlPrefixes = addlPrefixes
+        self.graphEditEvents = asyncio.Queue()
+        log.info("setup watches")
+        self._wf = WatchedFiles(dirsToWatch=dirUriMap.keys(), filenameFilter=self._filenameFilter)
+        self._loopTask = asyncio.create_task(self._loop())
+
+    graphEditEvents: asyncio.Queue[GraphEditEvent]
+
+    def writeGraph(self, uri: URIRef, triples: Iterable[Tuple]):
+        """commit this subgraph to disk (soon). triples todo- maybe we want a Graph"""
+        g = Graph()
+        for a, b in self.addlPrefixes.items():
+            g.bind(a, b)
+
+        for t in triples:
+            g.add(t)
+        # something with prefixes..
+        # preserve the prefixes that were in the file
+        # use and remove any addPrefixes ones too
+        n3 = g.serialize(format='n3')
+
+        # older code, prob boring
+        # for p, n in (list(self.globalPrefixes.items()) + list(self.readPrefixes.items()) + list(self.ctxPrefixes.items())):
+        #     self.graphToWrite.bind(p, n)
+        # self.graphToWrite.serialize(destination=f, format='n3', encoding='utf8')
+
+        self._wf.writeFile(fileForUri(self._dirUriMap, uri), n3)
+
+    def cancel(self):
+        self._loopTask.cancel()
+        self._wf.cancel()
+
+    def _filenameFilter(self, p: Path) -> bool:
+        if p.name.startswith('.'):
+            return False
+
+        ext = p.name.rsplit('.', 1)[-1]
+        if ext not in ['.n3']:
+            return False
+
+        if '/capture/' in str(p):
+            # smaller graph for now
+            return False
+
+        # an n3 file with rules makes it all the way past this reading
+        # and the serialization. Then, on the receiving side, a
+        # SyncedGraph calls graphFromNQuad on the incoming data and
+        # has a parse error. I'm not sure where this should be fixed
+        # yet.
+        if '-rules' in str(p):
+            return False
+
+        # for legacy versions, compile all the config stuff you want
+        # read into one file called config.n3. New versions won't read
+        # it.
+        if p.name == "config.n3":
+            return False
+
+        return True
+
+    async def _loop(self):
+        while True:
+            ev = await self._wf.fileEditEvents.get()
+            uri = uriFromFile(self._dirUriMap, ev.path)
+            g = Graph()
+            if ev.content is FileIsGone:
+                pass
+            else:
+                try:
+                    g.parse(data=cast(str, ev.content))  # todo: could fail here
+                except Exception:
+                    traceback.print_exc()
+                    log.warn(f'ignoring this edit to {ev.path}')
+                    continue
+
+            if not g.__len__():
+                continue
+
+            await self.graphEditEvents.put(GraphEditEvent(uri, g.triples(ALLSTMTS)))
+
+    # def aboutToPatch(self, ctx: URIRef):
+    #     """
+    #     warn us that a patch is about to come to this context. it's more
+    #     straightforward to create the new file now
+
+    #     this is meant to make the file before we add triples, so we
+    #     wouldn't see the blank file and lose those triples. But it
+    #     didn't work, so there are other measures that make us not lose
+    #     the triples from a new file. Calling this before patching the
+    #     graph is still a reasonable thing to do, though.
+    #     """
+    #     if ctx not in self.graphFiles:
+    #         outFile = fileForUri(self.dirUriMap, ctx)
+    #         assert '//' not in str(outFile), (outFile, self.dirUriMap, ctx)
+    #         log.info("starting new file %r", outFile)
+
+    # def _addGraphFile(self, ctx: URIRef, path: Path):
+    #     self.addlPrefixes.setdefault(ctx, {})
+    #     self.addlPrefixes.setdefault(None, {})
+    #     gf = GraphFile(self.notifier, path, ctx, self.patch, self.getSubgraph, globalPrefixes=self.addlPrefixes[None], ctxPrefixes=self.addlPrefixes[ctx])
+    #     self.graphFiles[ctx] = gf
+    #     # fileStats.mappedGraphFiles = len(self.graphFiles)
+    #     return gf
+
+    # def dirtyFiles(self, ctxs):
+    #     """mark dirty the files that we watch in these contexts.
+
+    #     the ctx might not be a file that we already read; it might be
+    #     for a new file we have to create, or it might be for a
+    #     transient context that we're not going to save
+
+    #     if it's a ctx with no file, error
+    #     """
+    #     for ctx in ctxs:
+    #         g = self.getSubgraph(ctx)
+    #         self.graphFiles[ctx].dirty(g)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rdfdb/watched_graphs_test.py	Mon May 30 20:28:17 2022 -0700
@@ -0,0 +1,39 @@
+import asyncio
+import logging
+from pathlib import Path
+
+import pytest
+from rdflib import Namespace
+
+from .watched_graphs import WatchedGraphs
+
+log = logging.getLogger()
+
+EX = Namespace('http://example.com/')
+
+
+async def assert_no_events_coming(q: asyncio.Queue):
+    """
+    see if inotify makes an event or stays quiet
+    """
+    inotify_max_wait_secs = 0.1
+    with pytest.raises(asyncio.TimeoutError):
+        # no events from the empty dir
+        ev = await asyncio.wait_for(q.get(), timeout=inotify_max_wait_secs)
+        log.error(f'expected no events; got {ev}', stacklevel=2)
+
+
+@pytest.mark.asyncio
+async def test_make_graph_from_file(tmpdir):
+    root = Path(tmpdir.mkdir("root"))
+    someFile = root / 'hello.n3'
+    someFile.write_text("# some n3")
+
+    wg = WatchedGraphs({root: EX}, addlPrefixes={})
+
+    ev = await asyncio.wait_for(wg.graphEditEvents.get(), timeout=1.0)
+    assert ev.uri == EX['hello']
+    assert list(ev.content) == []
+
+    await assert_no_events_coming(wg.graphEditEvents)
+    wg.cancel()