Mercurial > code > home > repos > rdfdb
changeset 99:22f81cb04da4
WIP graph_file replaced with two layers: text files with paths and change monitoring, and a graph layer over that
author | drewp@bigasterisk.com |
---|---|
date | Mon, 30 May 2022 20:28:17 -0700 |
parents | 32e2c91eeaf3 |
children | 27dcc13f9958 |
files | rdfdb/graphfile.py rdfdb/graphfile_test.py rdfdb/watched_files.py rdfdb/watched_files_test.py rdfdb/watched_graphs.py rdfdb/watched_graphs_test.py |
diffstat | 6 files changed, 608 insertions(+), 44 deletions(-) [+] |
line wrap: on
line diff
--- a/rdfdb/graphfile.py Mon May 30 20:39:59 2022 -0700 +++ b/rdfdb/graphfile.py Mon May 30 20:28:17 2022 -0700 @@ -15,6 +15,7 @@ from rdfdb.patch import Patch from rdfdb.rdflibpatch import inContext +raise NotImplementedError('deleteme') reactor = cast(IReactorCore, twisted.internet.reactor) log = logging.getLogger('graphfile')
--- a/rdfdb/graphfile_test.py Mon May 30 20:39:59 2022 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,44 +0,0 @@ -import tempfile -import unittest -from pathlib import Path -from typing import cast - -import mock -from _pytest.assertion import truncate -from rdflib import Graph, URIRef -from twisted.internet.inotify import INotify - -from rdfdb.graphfile import GraphFile - -truncate.DEFAULT_MAX_LINES = 9999 -truncate.DEFAULT_MAX_CHARS = 9999 - - -class TestGraphFileOutput(unittest.TestCase): - - def testMaintainsN3PrefixesFromInput(self): - tf = tempfile.NamedTemporaryFile(suffix='_test.n3') - tf.write(b''' - @prefix : <http://example.com/> . - @prefix n: <http://example.com/n/> . - :foo n:bar :baz . - ''') - tf.flush() - - def getSubgraph(uri): - return Graph() - - gf = GraphFile(cast(INotify, mock.Mock()), Path(tf.name), URIRef('uri'), mock.Mock(), getSubgraph, {}, {}) - gf.reread() - - newGraph = Graph() - newGraph.add((URIRef('http://example.com/boo'), URIRef('http://example.com/n/two'), URIRef('http://example.com/other/ns'))) - gf.dirty(newGraph) - gf.flush() - wroteContent = open(tf.name, 'rb').read() - print(wroteContent) - self.assertEqual(b'''@prefix : <http://example.com/> . -@prefix n: <http://example.com/n/> . - -:boo n:two <http://example.com/other/ns> . -''', wroteContent)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/watched_files.py Mon May 30 20:28:17 2022 -0700 @@ -0,0 +1,173 @@ +import asyncio +import dataclasses +import logging +from pathlib import Path +from typing import Callable, Iterable, Optional, Type, Union + +import asyncinotify +from asyncinotify import Event, Inotify, Mask + +log = logging.getLogger('watchedfiles') + + +class FileIsGone: + pass + + +@dataclasses.dataclass +class FileEditEvent: + path: Path + content: Union[str, Type[FileIsGone]] + + +class WatchedFiles: + """ + filenames and text content only; yields FileEditEvents + """ + """queue of file contents that the caller should use""" + fileEditEvents: asyncio.Queue[FileEditEvent] + + def __init__(self, dirsToWatch: Iterable[Path], filenameFilter: Callable[[Path], bool]): + """watches subtrees of the given dirs""" + + self._notifier = Inotify() + self.fileEditEvents = asyncio.Queue() + + log.info("setup watches") + print('watchedfiles') + for p in dirsToWatch: + self._watchTree(p) + self._convertEventsTask = asyncio.create_task(self._convertEvents()) + + def cancel(self): + """tests have to call this but i wish they didn't""" + self._convertEventsTask.cancel() + + def writeFile(self, p: Path, content: str): + + tmpOut = Path(str(p) + ".rdfdb-temp") + tmpOut.write_text(content) + + self.lastWriteTimestamp = tmpOut.stat().st_mtime + tmpOut.rename(p) + + p.write_text(content) + # and don't trigger event, etc + + #################################################################################### + + def _watchTree(self, top: Path): + self._watchDir(top) + for child in top.iterdir(): + if child.is_dir(): + self._watchTree(child) + continue + if self._watchablePath(child): + self._genReadEvent(child) # initial read for existing file + self._watchFile(child) + continue + + def _addWatch(self, p: Path, mask: Mask): + """quietly no-ops upon an existing matching-inode watch""" + try: + self._notifier.add_watch(p, mask | Mask.MASK_CREATE) + except asyncinotify.InotifyError as e: + if e.args == ('Call failed, errno 17: File exists',): + return + log.error(f"trying to watch {p}:") + raise + + def _watchFile(self, p: Path): + log.info(f'watchFile({p})') + # old code said this: + # inFile = correctToTopdirPrefix(self.dirUriMap, inFile) + + return + self._addWatch( + p, + mask=( + # Mask.ACCESS | # + # Mask.MODIFY | # + Mask.CLOSE_WRITE | # + # Mask.CLOSE_NOWRITE | # + Mask.OPEN | # + Mask.MOVED_FROM | # + Mask.MOVED_TO | # + Mask.CREATE | # + Mask.DELETE | # + Mask.DELETE_SELF | # + Mask.MOVE_SELF | # + Mask.IGNORED | # + 0)) + + def _watchDir(self, p: Path): + log.info(f'watchDir({p})') + assert p.is_dir() + self._addWatch( + p, + mask=( + # Mask.ACCESS | # + # Mask.CLOSE_NOWRITE | # + # Mask.IGNORED | # + # Mask.MODIFY | # + # Mask.MOVE_SELF | # + # Mask.OPEN | # + Mask.CLOSE_WRITE | # + Mask.CREATE | # + Mask.DELETE | # + Mask.DELETE_SELF | # + Mask.ISDIR | # + Mask.MOVED_FROM | # + Mask.MOVED_TO | # + 0)) + + async def _convertEvents(self): + """get inotify events, emit FileEditEvents""" + while True: + ev = await self._notifier.get() + # log.info(f'got notifier event {ev.path.name if ev.path else None} {ev.mask}') + + self._genEvent(ev, ev.path) + + def _genReadEvent(self, p: Path): + if not self._watchablePath(p): + return + try: + content = p.read_text() + except FileNotFoundError: + log.error(f'while making WatchEvent for file {p}') + raise + log.info('--> watchevent with content') + self.fileEditEvents.put_nowait(FileEditEvent(path=p, content=content)) + + def _genGoneEvent(self, p: Path): + log.info('--> watchevent with FileIsGone') + self.fileEditEvents.put_nowait(FileEditEvent(path=p, content=FileIsGone)) + + def _watchablePath(self, p: Path): + return p.name.endswith('.n3') + + def _genEvent(self, iev: Event, p: Optional[Path]): + if p is None: + raise TypeError + if p.name.endswith('.rdfdb-temp'): + return + log.info(f'ino callback path={str(p)[-30:]!s:30} watchpath={str(iev.path)[-30:]!s:30} {iev.mask!r}') + + if iev.mask == Mask.DELETE_SELF: + self._genGoneEvent(p) + if iev.watch is None: + raise NotImplementedError + self._notifier.rm_watch(iev.watch) + elif iev.mask == Mask.CLOSE_WRITE: + self._genReadEvent(p) + elif iev.mask == Mask.DELETE: + self._genGoneEvent(p) + elif iev.mask == Mask.ISDIR | Mask.CREATE: + self._watchDir(p) + elif iev.mask == Mask.MOVED_FROM: + if self._watchablePath(p): + self._genGoneEvent(p) + elif iev.mask == Mask.MOVED_TO: + if self._watchablePath(p): + self._genReadEvent(p)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/watched_files_test.py Mon May 30 20:28:17 2022 -0700 @@ -0,0 +1,246 @@ +import asyncio +import logging +from pathlib import Path +import pytest + +from .watched_files import FileIsGone, WatchedFiles, FileEditEvent +log = logging.getLogger() + + +async def assert_no_events_coming(q: asyncio.Queue): + """ + see if inotify makes an event or stays quiet + """ + inotify_max_wait_secs = 0.1 + with pytest.raises(asyncio.TimeoutError): + # no events from the empty dir + ev = await asyncio.wait_for(q.get(), timeout=inotify_max_wait_secs) + log.error(f'expected no events; got {ev}', stacklevel=2) + + +@pytest.mark.asyncio +async def test_notice_file_exists(tmpdir): + p = Path(tmpdir.mkdir("root")) + (p / "hello.n3").write_text("# some n3") + w = WatchedFiles([Path(p)], filenameFilter=lambda x: True) + + ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) + assert ev == FileEditEvent(path=p / "hello.n3", content="# some n3") + + await assert_no_events_coming(w.fileEditEvents) + + w.cancel() + +# @pytest.mark.asyncio +# async def test_ignore_other_extensions(tmpdir): +# p = tmpdir.mkdir("root") +# p.join("hello.txt").write("# some other text") +# w = WatchedFiles({Path(p): EX}) + +# await assert_no_events_coming(w.fileEditEvents) + +# @pytest.mark.asyncio +# async def test_notice_file_rewrite(tmpdir): +# root = tmpdir.mkdir("root") +# someFile = root / 'hello.n3' +# someFile.write("# some n3") + +# w = WatchedFiles({Path(root): EX}) + +# ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) +# assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content="# some n3") + +# log.info('new content write') +# someFile.write("# new content") + +# ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) +# assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content="# new content") + +# await assert_no_events_coming(w.fileEditEvents) + +# @pytest.mark.asyncio +# async def test_notice_file_create(tmpdir): +# root = tmpdir.mkdir("root") + +# w = WatchedFiles({Path(root): EX}) + +# someFile = root / 'hello.n3' +# someFile.write("# some n3") + +# ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) +# assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content="# some n3") + +# await assert_no_events_coming(w.fileEditEvents) + +# @pytest.mark.asyncio +# async def test_notice_file_delete(tmpdir): +# someFile = Path(tmpdir) / 'hello.n3' +# someFile.write_text("# some n3") + +# w = WatchedFiles({Path(tmpdir): EX}) + +# ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) +# assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content="# some n3") + +# someFile.unlink() + +# ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) +# assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content=FileIsGone) + +# await assert_no_events_coming(w.fileEditEvents) + +# ########## +# # subdir + +# @pytest.mark.asyncio +# async def test_notice_subdir_file_exists(tmpdir): +# sub1 = Path(tmpdir) / 'sub1' +# sub1.mkdir() +# someFile = sub1 / 'hello.n3' +# someFile.write_text('# some n3') + +# w = WatchedFiles({Path(tmpdir): EX}) + +# ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) +# assert ev == FileEditEvent(uri=URIRef(EX + 'sub1/hello'), content="# some n3") + +# await assert_no_events_coming(w.fileEditEvents) + +# @pytest.mark.asyncio +# async def test_notice_subdir_file_create(tmpdir): +# sub1 = Path(tmpdir) / 'sub1' +# sub1.mkdir() + +# w = WatchedFiles({Path(tmpdir): EX}) + +# someFile = sub1 / 'hello.n3' +# someFile.write_text('# some n3') + +# ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) +# assert ev == FileEditEvent(uri=URIRef(EX + 'sub1/hello'), content="# some n3") + +# await assert_no_events_coming(w.fileEditEvents) + +# @pytest.mark.asyncio +# async def test_notice_subdir_create_and_file_create(tmpdir): +# w = WatchedFiles({Path(tmpdir): EX}) + +# sub1 = Path(tmpdir) / 'sub1' +# sub1.mkdir() + +# await assert_no_events_coming(w.fileEditEvents) + +# someFile = sub1 / 'hello.n3' +# someFile.write_text('# some n3') + +# ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) +# assert ev == FileEditEvent(uri=URIRef(EX + 'sub1/hello'), content="# some n3") + +# await assert_no_events_coming(w.fileEditEvents) + +# ############# +# # moves + +# @pytest.mark.asyncio +# async def test_file_move_looks_like_delete_then_create(tmpdir): +# root = Path(tmpdir) +# (root / "oldname.n3").write_text("# some n3") + +# w = WatchedFiles({root: EX}) +# ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) +# assert ev == FileEditEvent(uri=URIRef(EX + 'oldname'), content="# some n3") + +# (root / "oldname.n3").rename(root / "newname.n3") + +# ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) +# assert ev == FileEditEvent(uri=URIRef(EX + 'oldname'), content=FileIsGone) + +# ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) +# assert ev == FileEditEvent(uri=URIRef(EX + 'newname'), content="# some n3") + +# await assert_no_events_coming(w.fileEditEvents) + +# @pytest.mark.asyncio +# async def test_move_to_other_extension_looks_like_delete(tmpdir): +# root = Path(tmpdir) +# (root / "oldname.n3").write_text("# some n3") + +# w = WatchedFiles({root: EX}) +# ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) +# assert ev == FileEditEvent(uri=URIRef(EX + 'oldname'), content="# some n3") + +# (root / "oldname.n3").rename(root / "newname.txt") + +# ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) +# assert ev == FileEditEvent(uri=URIRef(EX + 'oldname'), content=FileIsGone) + +# await assert_no_events_coming(w.fileEditEvents) + +# @pytest.mark.asyncio +# async def test_move_to_matched_extension_looks_like_create(tmpdir): +# root = Path(tmpdir) +# (root / "oldname.txt").write_text("# some data") + +# w = WatchedFiles({root: EX}) +# await assert_no_events_coming(w.fileEditEvents) + +# (root / "oldname.txt").rename(root / "newname.n3") + +# ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) +# assert ev == FileEditEvent(uri=URIRef(EX + 'newname'), content="# some data") + +# await assert_no_events_coming(w.fileEditEvents) + +# @pytest.mark.asyncio +# async def test_move_overwrites_existing_file_like_an_editor_might_save(tmpdir): +# root = Path(tmpdir) +# (root / "hello.n3").write_text("# some data") + +# w = WatchedFiles({root: EX}) +# ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) +# assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content="# some data") + +# (root / "hello.n3.new").write_text("# some new data") +# (root / "hello.n3.new").rename(root / "hello.n3") + +# ev = await asyncio.wait_for(w.fileEditEvents.get(), timeout=1.0) +# assert ev == FileEditEvent(uri=URIRef(EX + 'hello'), content="# some new data") + +# await assert_no_events_coming(w.fileEditEvents) + +# ############## +# # writeToFile + +# @pytest.mark.asyncio +# async def test_write_new_file(tmpdir): +# root = Path(tmpdir) + +# w = WatchedFiles({root: EX}) +# w.writeGraph(EX + 'hello', '# some n3') + +# await assert_no_events_coming(w.fileEditEvents) + +# assert (root / 'hello.n3').read_text() == "# some n3" + +# @pytest.mark.asyncio +# async def test_write_new_file_in_new_dirs(tmpdir): +# root = Path(tmpdir) + +# w = WatchedFiles({root: EX}) +# w.writeGraph(EX + 'a/b/hello', '# some n3') + +# await assert_no_events_coming(w.fileEditEvents) + +# assert (root / 'a/b/hello.n3').read_text() == "# some n3" + +# @pytest.mark.asyncio +# async def test_write_over_existing_file(tmpdir): +# root = Path(tmpdir) +# (root / "hello.n3").write_text("# some n3") + +# w = WatchedFiles({root: EX}) +# w.writeGraph(EX + 'hello', '# some new n3') + +# await assert_no_events_coming(w.fileEditEvents) + +# assert (root / 'hello.n3').read_text() == "# some new n3"
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/watched_graphs.py Mon May 30 20:28:17 2022 -0700 @@ -0,0 +1,149 @@ +import asyncio +import dataclasses +import logging +from pathlib import Path +import traceback +from typing import Dict, Iterable, Tuple, cast + +from rdflib import Graph, URIRef + +from rdfdb.compact_turtle import patchN3SerializerToUseLessWhitespace +from rdfdb.file_vs_uri import DirUriMap, fileForUri, uriFromFile +from rdfdb.patch import ALLSTMTS +from rdfdb.watched_files import FileIsGone, WatchedFiles + +patchN3SerializerToUseLessWhitespace() + +log = logging.getLogger('watchedgraphs') + + +@dataclasses.dataclass +class GraphEditEvent: + uri: URIRef + content: Iterable[Tuple] # replace subgraph with these stmts + + +class WatchedGraphs: + """ + performs file<->uri mapping; parses/writes fiels; yields GraphEvents. + Includes throttling for quick graph updates. + """ + + def __init__(self, dirUriMap: DirUriMap, addlPrefixes: Dict[str, URIRef]): + self._dirUriMap = dirUriMap + self.addlPrefixes = addlPrefixes + self.graphEditEvents = asyncio.Queue() + log.info("setup watches") + self._wf = WatchedFiles(dirsToWatch=dirUriMap.keys(), filenameFilter=self._filenameFilter) + self._loopTask = asyncio.create_task(self._loop()) + + graphEditEvents: asyncio.Queue[GraphEditEvent] + + def writeGraph(self, uri: URIRef, triples: Iterable[Tuple]): + """commit this subgraph to disk (soon). triples todo- maybe we want a Graph""" + g = Graph() + for a, b in self.addlPrefixes.items(): + g.bind(a, b) + + for t in triples: + g.add(t) + # something with prefixes.. + # preserve the prefixes that were in the file + # use and remove any addPrefixes ones too + n3 = g.serialize(format='n3') + + # older code, prob boring + # for p, n in (list(self.globalPrefixes.items()) + list(self.readPrefixes.items()) + list(self.ctxPrefixes.items())): + # self.graphToWrite.bind(p, n) + # self.graphToWrite.serialize(destination=f, format='n3', encoding='utf8') + + self._wf.writeFile(fileForUri(self._dirUriMap, uri), n3) + + def cancel(self): + self._loopTask.cancel() + self._wf.cancel() + + def _filenameFilter(self, p: Path) -> bool: + if p.name.startswith('.'): + return False + + ext = p.name.rsplit('.', 1)[-1] + if ext not in ['.n3']: + return False + + if '/capture/' in str(p): + # smaller graph for now + return False + + # an n3 file with rules makes it all the way past this reading + # and the serialization. Then, on the receiving side, a + # SyncedGraph calls graphFromNQuad on the incoming data and + # has a parse error. I'm not sure where this should be fixed + # yet. + if '-rules' in str(p): + return False + + # for legacy versions, compile all the config stuff you want + # read into one file called config.n3. New versions won't read + # it. + if p.name == "config.n3": + return False + + return True + + async def _loop(self): + while True: + ev = await self._wf.fileEditEvents.get() + uri = uriFromFile(self._dirUriMap, ev.path) + g = Graph() + if ev.content is FileIsGone: + pass + else: + try: + g.parse(data=cast(str, ev.content)) # todo: could fail here + except Exception: + traceback.print_exc() + log.warn(f'ignoring this edit to {ev.path}') + continue + + if not g.__len__(): + continue + + await self.graphEditEvents.put(GraphEditEvent(uri, g.triples(ALLSTMTS))) + + # def aboutToPatch(self, ctx: URIRef): + # """ + # warn us that a patch is about to come to this context. it's more + # straightforward to create the new file now + + # this is meant to make the file before we add triples, so we + # wouldn't see the blank file and lose those triples. But it + # didn't work, so there are other measures that make us not lose + # the triples from a new file. Calling this before patching the + # graph is still a reasonable thing to do, though. + # """ + # if ctx not in self.graphFiles: + # outFile = fileForUri(self.dirUriMap, ctx) + # assert '//' not in str(outFile), (outFile, self.dirUriMap, ctx) + # log.info("starting new file %r", outFile) + + # def _addGraphFile(self, ctx: URIRef, path: Path): + # self.addlPrefixes.setdefault(ctx, {}) + # self.addlPrefixes.setdefault(None, {}) + # gf = GraphFile(self.notifier, path, ctx, self.patch, self.getSubgraph, globalPrefixes=self.addlPrefixes[None], ctxPrefixes=self.addlPrefixes[ctx]) + # self.graphFiles[ctx] = gf + # # fileStats.mappedGraphFiles = len(self.graphFiles) + # return gf + + # def dirtyFiles(self, ctxs): + # """mark dirty the files that we watch in these contexts. + + # the ctx might not be a file that we already read; it might be + # for a new file we have to create, or it might be for a + # transient context that we're not going to save + + # if it's a ctx with no file, error + # """ + # for ctx in ctxs: + # g = self.getSubgraph(ctx) + # self.graphFiles[ctx].dirty(g)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/watched_graphs_test.py Mon May 30 20:28:17 2022 -0700 @@ -0,0 +1,39 @@ +import asyncio +import logging +from pathlib import Path + +import pytest +from rdflib import Namespace + +from .watched_graphs import WatchedGraphs + +log = logging.getLogger() + +EX = Namespace('http://example.com/') + + +async def assert_no_events_coming(q: asyncio.Queue): + """ + see if inotify makes an event or stays quiet + """ + inotify_max_wait_secs = 0.1 + with pytest.raises(asyncio.TimeoutError): + # no events from the empty dir + ev = await asyncio.wait_for(q.get(), timeout=inotify_max_wait_secs) + log.error(f'expected no events; got {ev}', stacklevel=2) + + +@pytest.mark.asyncio +async def test_make_graph_from_file(tmpdir): + root = Path(tmpdir.mkdir("root")) + someFile = root / 'hello.n3' + someFile.write_text("# some n3") + + wg = WatchedGraphs({root: EX}, addlPrefixes={}) + + ev = await asyncio.wait_for(wg.graphEditEvents.get(), timeout=1.0) + assert ev.uri == EX['hello'] + assert list(ev.content) == [] + + await assert_no_events_coming(wg.graphEditEvents) + wg.cancel()