Mercurial > code > home > repos > rdfdb
view rdfdb/watched_files_test.py @ 99:22f81cb04da4
WIP graph_file replaced with two layers: text files with paths and change monitoring, and a graph layer over that
author | drewp@bigasterisk.com |
---|---|
date | Mon, 30 May 2022 20:28:17 -0700 |
parents | |
children |
line wrap: on
line source
import asyncio import logging from pathlib import Path import pytest from .watched_files import FileIsGone, WatchedFiles, FileEditEvent log = logging.getLogger() async def assert_no_events_coming(q: asyncio.Queue): """ see if inotify makes an event or stays quiet """ inotify_max_wait_secs = 0.1 with pytest.raises(asyncio.TimeoutError): # no events from the empty dir ev = await asyncio.wait_for(q.get(), timeout=inotify_max_wait_secs) log.error(f'expected no events; got {ev}', stacklevel=2) @pytest.mark.asyncio async def test_notice_file_exists(tmpdir): p = Path(tmpdir.mkdir("root")) (p / "hello.n3").write_text("# some n3") w = WatchedFiles([Path(p)], filenameFilter=lambda x: True) ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) assert ev == FileEditEvent(path=p / "hello.n3", content="# some n3") await assert_no_events_coming(w.fileEditEvents) w.cancel() # @pytest.mark.asyncio # async def test_ignore_other_extensions(tmpdir): # p = tmpdir.mkdir("root") # p.join("hello.txt").write("# some other text") # w = WatchedFiles({Path(p): EX}) # await assert_no_events_coming(w.fileEditEvents) # @pytest.mark.asyncio # async def test_notice_file_rewrite(tmpdir): # root = tmpdir.mkdir("root") # someFile = root / 'hello.n3' # someFile.write("# some n3") # w = WatchedFiles({Path(root): EX}) # ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) # assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content="# some n3") # log.info('new content write') # someFile.write("# new content") # ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) # assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content="# new content") # await assert_no_events_coming(w.fileEditEvents) # @pytest.mark.asyncio # async def test_notice_file_create(tmpdir): # root = tmpdir.mkdir("root") # w = WatchedFiles({Path(root): EX}) # someFile = root / 'hello.n3' # someFile.write("# some n3") # ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) # assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content="# some n3") # await assert_no_events_coming(w.fileEditEvents) # @pytest.mark.asyncio # async def test_notice_file_delete(tmpdir): # someFile = Path(tmpdir) / 'hello.n3' # someFile.write_text("# some n3") # w = WatchedFiles({Path(tmpdir): EX}) # ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) # assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content="# some n3") # someFile.unlink() # ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) # assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content=FileIsGone) # await assert_no_events_coming(w.fileEditEvents) # ########## # # subdir # @pytest.mark.asyncio # async def test_notice_subdir_file_exists(tmpdir): # sub1 = Path(tmpdir) / 'sub1' # sub1.mkdir() # someFile = sub1 / 'hello.n3' # someFile.write_text('# some n3') # w = WatchedFiles({Path(tmpdir): EX}) # ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) # assert ev == FileEditEvent(uri=URIRef(EX + 'sub1/hello'), content="# some n3") # await assert_no_events_coming(w.fileEditEvents) # @pytest.mark.asyncio # async def test_notice_subdir_file_create(tmpdir): # sub1 = Path(tmpdir) / 'sub1' # sub1.mkdir() # w = WatchedFiles({Path(tmpdir): EX}) # someFile = sub1 / 'hello.n3' # someFile.write_text('# some n3') # ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) # assert ev == FileEditEvent(uri=URIRef(EX + 'sub1/hello'), content="# some n3") # await assert_no_events_coming(w.fileEditEvents) # @pytest.mark.asyncio # async def test_notice_subdir_create_and_file_create(tmpdir): # w = WatchedFiles({Path(tmpdir): EX}) # sub1 = Path(tmpdir) / 'sub1' # sub1.mkdir() # await assert_no_events_coming(w.fileEditEvents) # someFile = sub1 / 'hello.n3' # someFile.write_text('# some n3') # ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) # assert ev == FileEditEvent(uri=URIRef(EX + 'sub1/hello'), content="# some n3") # await assert_no_events_coming(w.fileEditEvents) # ############# # # moves # @pytest.mark.asyncio # async def test_file_move_looks_like_delete_then_create(tmpdir): # root = Path(tmpdir) # (root / "oldname.n3").write_text("# some n3") # w = WatchedFiles({root: EX}) # ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) # assert ev == FileEditEvent(uri=URIRef(EX + 'oldname'), content="# some n3") # (root / "oldname.n3").rename(root / "newname.n3") # ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) # assert ev == FileEditEvent(uri=URIRef(EX + 'oldname'), content=FileIsGone) # ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) # assert ev == FileEditEvent(uri=URIRef(EX + 'newname'), content="# some n3") # await assert_no_events_coming(w.fileEditEvents) # @pytest.mark.asyncio # async def test_move_to_other_extension_looks_like_delete(tmpdir): # root = Path(tmpdir) # (root / "oldname.n3").write_text("# some n3") # w = WatchedFiles({root: EX}) # ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) # assert ev == FileEditEvent(uri=URIRef(EX + 'oldname'), content="# some n3") # (root / "oldname.n3").rename(root / "newname.txt") # ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) # assert ev == FileEditEvent(uri=URIRef(EX + 'oldname'), content=FileIsGone) # await assert_no_events_coming(w.fileEditEvents) # @pytest.mark.asyncio # async def test_move_to_matched_extension_looks_like_create(tmpdir): # root = Path(tmpdir) # (root / "oldname.txt").write_text("# some data") # w = WatchedFiles({root: EX}) # await assert_no_events_coming(w.fileEditEvents) # (root / "oldname.txt").rename(root / "newname.n3") # ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) # assert ev == FileEditEvent(uri=URIRef(EX + 'newname'), content="# some data") # await assert_no_events_coming(w.fileEditEvents) # @pytest.mark.asyncio # async def test_move_overwrites_existing_file_like_an_editor_might_save(tmpdir): # root = Path(tmpdir) # (root / "hello.n3").write_text("# some data") # w = WatchedFiles({root: EX}) # ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) # assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content="# some data") # (root / "hello.n3.new").write_text("# some new data") # (root / "hello.n3.new").rename(root / "hello.n3") # ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) # assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content="# some new data") # await assert_no_events_coming(w.fileEditEvents) # ############## # # writeToFile # @pytest.mark.asyncio # async def test_write_new_file(tmpdir): # root = Path(tmpdir) # w = WatchedFiles({root: EX}) # w.writeGraph(EX + 'hello', '# some n3') # await assert_no_events_coming(w.fileEditEvents) # assert (root / 'hello.n3').read_text() == "# some n3" # @pytest.mark.asyncio # async def test_write_new_file_in_new_dirs(tmpdir): # root = Path(tmpdir) # w = WatchedFiles({root: EX}) # w.writeGraph(EX + 'a/b/hello', '# some n3') # await assert_no_events_coming(w.fileEditEvents) # assert (root / 'a/b/hello.n3').read_text() == "# some n3" # @pytest.mark.asyncio # async def test_write_over_existing_file(tmpdir): # root = Path(tmpdir) # (root / "hello.n3").write_text("# some n3") # w = WatchedFiles({root: EX}) # w.writeGraph(EX + 'hello', '# some new n3') # await assert_no_events_coming(w.fileEditEvents) # assert (root / 'hello.n3').read_text() == "# some new n3"