view rdfdb/watched_graphs.py @ 108:a4a060241c73

add a pydeps task. `inv pydeps`
author drewp@bigasterisk.com
date Mon, 30 May 2022 22:44:50 -0700
parents 22f81cb04da4
children bc643d61bb7c
line wrap: on
line source

import asyncio
import dataclasses
import logging
from pathlib import Path
import traceback
from typing import Dict, Iterable, Tuple, cast

from rdflib import Graph, URIRef

from rdfdb.compact_turtle import patchN3SerializerToUseLessWhitespace
from rdfdb.file_vs_uri import DirUriMap, fileForUri, uriFromFile
from rdfdb.patch import ALLSTMTS
from rdfdb.watched_files import FileIsGone, WatchedFiles

patchN3SerializerToUseLessWhitespace()

log = logging.getLogger('watchedgraphs')


@dataclasses.dataclass
class GraphEditEvent:
    uri: URIRef
    content: Iterable[Tuple]  # replace subgraph with these stmts


class WatchedGraphs:
    """
    performs file<->uri mapping; parses/writes fiels; yields GraphEvents.
    Includes throttling for quick graph updates.
    """

    def __init__(self, dirUriMap: DirUriMap, addlPrefixes: Dict[str, URIRef]):
        self._dirUriMap = dirUriMap
        self.addlPrefixes = addlPrefixes
        self.graphEditEvents = asyncio.Queue()
        log.info("setup watches")
        self._wf = WatchedFiles(dirsToWatch=dirUriMap.keys(), filenameFilter=self._filenameFilter)
        self._loopTask = asyncio.create_task(self._loop())

    graphEditEvents: asyncio.Queue[GraphEditEvent]

    def writeGraph(self, uri: URIRef, triples: Iterable[Tuple]):
        """commit this subgraph to disk (soon). triples todo- maybe we want a Graph"""
        g = Graph()
        for a, b in self.addlPrefixes.items():
            g.bind(a, b)

        for t in triples:
            g.add(t)
        # something with prefixes..
        # preserve the prefixes that were in the file
        # use and remove any addPrefixes ones too
        n3 = g.serialize(format='n3')

        # older code, prob boring
        # for p, n in (list(self.globalPrefixes.items()) + list(self.readPrefixes.items()) + list(self.ctxPrefixes.items())):
        #     self.graphToWrite.bind(p, n)
        # self.graphToWrite.serialize(destination=f, format='n3', encoding='utf8')

        self._wf.writeFile(fileForUri(self._dirUriMap, uri), n3)

    def cancel(self):
        self._loopTask.cancel()
        self._wf.cancel()

    def _filenameFilter(self, p: Path) -> bool:
        if p.name.startswith('.'):
            return False

        ext = p.name.rsplit('.', 1)[-1]
        if ext not in ['.n3']:
            return False

        if '/capture/' in str(p):
            # smaller graph for now
            return False

        # an n3 file with rules makes it all the way past this reading
        # and the serialization. Then, on the receiving side, a
        # SyncedGraph calls graphFromNQuad on the incoming data and
        # has a parse error. I'm not sure where this should be fixed
        # yet.
        if '-rules' in str(p):
            return False

        # for legacy versions, compile all the config stuff you want
        # read into one file called config.n3. New versions won't read
        # it.
        if p.name == "config.n3":
            return False

        return True

    async def _loop(self):
        while True:
            ev = await self._wf.fileEditEvents.get()
            uri = uriFromFile(self._dirUriMap, ev.path)
            g = Graph()
            if ev.content is FileIsGone:
                pass
            else:
                try:
                    g.parse(data=cast(str, ev.content))  # todo: could fail here
                except Exception:
                    traceback.print_exc()
                    log.warn(f'ignoring this edit to {ev.path}')
                    continue

            if not g.__len__():
                continue

            await self.graphEditEvents.put(GraphEditEvent(uri, g.triples(ALLSTMTS)))

    # def aboutToPatch(self, ctx: URIRef):
    #     """
    #     warn us that a patch is about to come to this context. it's more
    #     straightforward to create the new file now

    #     this is meant to make the file before we add triples, so we
    #     wouldn't see the blank file and lose those triples. But it
    #     didn't work, so there are other measures that make us not lose
    #     the triples from a new file. Calling this before patching the
    #     graph is still a reasonable thing to do, though.
    #     """
    #     if ctx not in self.graphFiles:
    #         outFile = fileForUri(self.dirUriMap, ctx)
    #         assert '//' not in str(outFile), (outFile, self.dirUriMap, ctx)
    #         log.info("starting new file %r", outFile)

    # def _addGraphFile(self, ctx: URIRef, path: Path):
    #     self.addlPrefixes.setdefault(ctx, {})
    #     self.addlPrefixes.setdefault(None, {})
    #     gf = GraphFile(self.notifier, path, ctx, self.patch, self.getSubgraph, globalPrefixes=self.addlPrefixes[None], ctxPrefixes=self.addlPrefixes[ctx])
    #     self.graphFiles[ctx] = gf
    #     # fileStats.mappedGraphFiles = len(self.graphFiles)
    #     return gf

    # def dirtyFiles(self, ctxs):
    #     """mark dirty the files that we watch in these contexts.

    #     the ctx might not be a file that we already read; it might be
    #     for a new file we have to create, or it might be for a
    #     transient context that we're not going to save

    #     if it's a ctx with no file, error
    #     """
    #     for ctx in ctxs:
    #         g = self.getSubgraph(ctx)
    #         self.graphFiles[ctx].dirty(g)