view rdfdb/watched_files_test.py @ 99:22f81cb04da4

WIP graph_file replaced with two layers: text files with paths and change monitoring, and a graph layer over that
author drewp@bigasterisk.com
date Mon, 30 May 2022 20:28:17 -0700
parents
children
line wrap: on
line source

import asyncio
import logging
from pathlib import Path
import pytest

from .watched_files import FileIsGone, WatchedFiles, FileEditEvent
log = logging.getLogger()


async def assert_no_events_coming(q: asyncio.Queue):
    """
    see if inotify makes an event or stays quiet
    """
    inotify_max_wait_secs = 0.1
    with pytest.raises(asyncio.TimeoutError):
        # no events from the empty dir
        ev = await asyncio.wait_for(q.get(), timeout=inotify_max_wait_secs)
        log.error(f'expected no events; got {ev}', stacklevel=2)


@pytest.mark.asyncio
async def test_notice_file_exists(tmpdir):
    p = Path(tmpdir.mkdir("root"))
    (p / "hello.n3").write_text("# some n3")
    w = WatchedFiles([Path(p)], filenameFilter=lambda x: True)

    ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
    assert ev == FileEditEvent(path=p / "hello.n3", content="# some n3")

    await assert_no_events_coming(w.fileEditEvents)

    w.cancel()

# @pytest.mark.asyncio
# async def test_ignore_other_extensions(tmpdir):
#     p = tmpdir.mkdir("root")
#     p.join("hello.txt").write("# some other text")
#     w = WatchedFiles({Path(p): EX})

#     await assert_no_events_coming(w.fileEditEvents)

# @pytest.mark.asyncio
# async def test_notice_file_rewrite(tmpdir):
#     root = tmpdir.mkdir("root")
#     someFile = root / 'hello.n3'
#     someFile.write("# some n3")

#     w = WatchedFiles({Path(root): EX})

#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
#     assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content="# some n3")

#     log.info('new content write')
#     someFile.write("# new content")

#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
#     assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content="# new content")

#     await assert_no_events_coming(w.fileEditEvents)

# @pytest.mark.asyncio
# async def test_notice_file_create(tmpdir):
#     root = tmpdir.mkdir("root")

#     w = WatchedFiles({Path(root): EX})

#     someFile = root / 'hello.n3'
#     someFile.write("# some n3")

#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
#     assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content="# some n3")

#     await assert_no_events_coming(w.fileEditEvents)

# @pytest.mark.asyncio
# async def test_notice_file_delete(tmpdir):
#     someFile = Path(tmpdir) / 'hello.n3'
#     someFile.write_text("# some n3")

#     w = WatchedFiles({Path(tmpdir): EX})

#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
#     assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content="# some n3")

#     someFile.unlink()

#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
#     assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content=FileIsGone)

#     await assert_no_events_coming(w.fileEditEvents)

# ##########
# # subdir

# @pytest.mark.asyncio
# async def test_notice_subdir_file_exists(tmpdir):
#     sub1 = Path(tmpdir) / 'sub1'
#     sub1.mkdir()
#     someFile = sub1 / 'hello.n3'
#     someFile.write_text('# some n3')

#     w = WatchedFiles({Path(tmpdir): EX})

#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
#     assert ev == FileEditEvent(uri=URIRef(EX + 'sub1/hello'), content="# some n3")

#     await assert_no_events_coming(w.fileEditEvents)

# @pytest.mark.asyncio
# async def test_notice_subdir_file_create(tmpdir):
#     sub1 = Path(tmpdir) / 'sub1'
#     sub1.mkdir()

#     w = WatchedFiles({Path(tmpdir): EX})

#     someFile = sub1 / 'hello.n3'
#     someFile.write_text('# some n3')

#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
#     assert ev == FileEditEvent(uri=URIRef(EX + 'sub1/hello'), content="# some n3")

#     await assert_no_events_coming(w.fileEditEvents)

# @pytest.mark.asyncio
# async def test_notice_subdir_create_and_file_create(tmpdir):
#     w = WatchedFiles({Path(tmpdir): EX})

#     sub1 = Path(tmpdir) / 'sub1'
#     sub1.mkdir()

#     await assert_no_events_coming(w.fileEditEvents)

#     someFile = sub1 / 'hello.n3'
#     someFile.write_text('# some n3')

#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
#     assert ev == FileEditEvent(uri=URIRef(EX + 'sub1/hello'), content="# some n3")

#     await assert_no_events_coming(w.fileEditEvents)

# #############
# # moves

# @pytest.mark.asyncio
# async def test_file_move_looks_like_delete_then_create(tmpdir):
#     root = Path(tmpdir)
#     (root / "oldname.n3").write_text("# some n3")

#     w = WatchedFiles({root: EX})
#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
#     assert ev == FileEditEvent(uri=URIRef(EX + 'oldname'), content="# some n3")

#     (root / "oldname.n3").rename(root / "newname.n3")

#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
#     assert ev == FileEditEvent(uri=URIRef(EX + 'oldname'), content=FileIsGone)

#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
#     assert ev == FileEditEvent(uri=URIRef(EX + 'newname'), content="# some n3")

#     await assert_no_events_coming(w.fileEditEvents)

# @pytest.mark.asyncio
# async def test_move_to_other_extension_looks_like_delete(tmpdir):
#     root = Path(tmpdir)
#     (root / "oldname.n3").write_text("# some n3")

#     w = WatchedFiles({root: EX})
#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
#     assert ev == FileEditEvent(uri=URIRef(EX + 'oldname'), content="# some n3")

#     (root / "oldname.n3").rename(root / "newname.txt")

#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
#     assert ev == FileEditEvent(uri=URIRef(EX + 'oldname'), content=FileIsGone)

#     await assert_no_events_coming(w.fileEditEvents)

# @pytest.mark.asyncio
# async def test_move_to_matched_extension_looks_like_create(tmpdir):
#     root = Path(tmpdir)
#     (root / "oldname.txt").write_text("# some data")

#     w = WatchedFiles({root: EX})
#     await assert_no_events_coming(w.fileEditEvents)

#     (root / "oldname.txt").rename(root / "newname.n3")

#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
#     assert ev == FileEditEvent(uri=URIRef(EX + 'newname'), content="# some data")

#     await assert_no_events_coming(w.fileEditEvents)

# @pytest.mark.asyncio
# async def test_move_overwrites_existing_file_like_an_editor_might_save(tmpdir):
#     root = Path(tmpdir)
#     (root / "hello.n3").write_text("# some data")

#     w = WatchedFiles({root: EX})
#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
#     assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content="# some data")

#     (root / "hello.n3.new").write_text("# some new data")
#     (root / "hello.n3.new").rename(root / "hello.n3")

#     ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0)
#     assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content="# some new data")

#     await assert_no_events_coming(w.fileEditEvents)

# ##############
# # writeToFile

# @pytest.mark.asyncio
# async def test_write_new_file(tmpdir):
#     root = Path(tmpdir)

#     w = WatchedFiles({root: EX})
#     w.writeGraph(EX + 'hello', '# some n3')

#     await assert_no_events_coming(w.fileEditEvents)

#     assert (root / 'hello.n3').read_text() == "# some n3"

# @pytest.mark.asyncio
# async def test_write_new_file_in_new_dirs(tmpdir):
#     root = Path(tmpdir)

#     w = WatchedFiles({root: EX})
#     w.writeGraph(EX + 'a/b/hello', '# some n3')

#     await assert_no_events_coming(w.fileEditEvents)

#     assert (root / 'a/b/hello.n3').read_text() == "# some n3"

# @pytest.mark.asyncio
# async def test_write_over_existing_file(tmpdir):
#     root = Path(tmpdir)
#     (root / "hello.n3").write_text("# some n3")

#     w = WatchedFiles({root: EX})
#     w.writeGraph(EX + 'hello', '# some new n3')

#     await assert_no_events_coming(w.fileEditEvents)

#     assert (root / 'hello.n3').read_text() == "# some new n3"