Mercurial > code > home > repos > rdfdb
view rdfdb/watched_graphs.py @ 108:a4a060241c73
add a pydeps task. `inv pydeps`
author | drewp@bigasterisk.com |
---|---|
date | Mon, 30 May 2022 22:44:50 -0700 |
parents | 22f81cb04da4 |
children | bc643d61bb7c |
line wrap: on
line source
import asyncio import dataclasses import logging from pathlib import Path import traceback from typing import Dict, Iterable, Tuple, cast from rdflib import Graph, URIRef from rdfdb.compact_turtle import patchN3SerializerToUseLessWhitespace from rdfdb.file_vs_uri import DirUriMap, fileForUri, uriFromFile from rdfdb.patch import ALLSTMTS from rdfdb.watched_files import FileIsGone, WatchedFiles patchN3SerializerToUseLessWhitespace() log = logging.getLogger('watchedgraphs') @dataclasses.dataclass class GraphEditEvent: uri: URIRef content: Iterable[Tuple] # replace subgraph with these stmts class WatchedGraphs: """ performs file<->uri mapping; parses/writes fiels; yields GraphEvents. Includes throttling for quick graph updates. """ def __init__(self, dirUriMap: DirUriMap, addlPrefixes: Dict[str, URIRef]): self._dirUriMap = dirUriMap self.addlPrefixes = addlPrefixes self.graphEditEvents = asyncio.Queue() log.info("setup watches") self._wf = WatchedFiles(dirsToWatch=dirUriMap.keys(), filenameFilter=self._filenameFilter) self._loopTask = asyncio.create_task(self._loop()) graphEditEvents: asyncio.Queue[GraphEditEvent] def writeGraph(self, uri: URIRef, triples: Iterable[Tuple]): """commit this subgraph to disk (soon). triples todo- maybe we want a Graph""" g = Graph() for a, b in self.addlPrefixes.items(): g.bind(a, b) for t in triples: g.add(t) # something with prefixes.. # preserve the prefixes that were in the file # use and remove any addPrefixes ones too n3 = g.serialize(format='n3') # older code, prob boring # for p, n in (list(self.globalPrefixes.items()) + list(self.readPrefixes.items()) + list(self.ctxPrefixes.items())): # self.graphToWrite.bind(p, n) # self.graphToWrite.serialize(destination=f, format='n3', encoding='utf8') self._wf.writeFile(fileForUri(self._dirUriMap, uri), n3) def cancel(self): self._loopTask.cancel() self._wf.cancel() def _filenameFilter(self, p: Path) -> bool: if p.name.startswith('.'): return False ext = p.name.rsplit('.', 1)[-1] if ext not in ['.n3']: return False if '/capture/' in str(p): # smaller graph for now return False # an n3 file with rules makes it all the way past this reading # and the serialization. Then, on the receiving side, a # SyncedGraph calls graphFromNQuad on the incoming data and # has a parse error. I'm not sure where this should be fixed # yet. if '-rules' in str(p): return False # for legacy versions, compile all the config stuff you want # read into one file called config.n3. New versions won't read # it. if p.name == "config.n3": return False return True async def _loop(self): while True: ev = await self._wf.fileEditEvents.get() uri = uriFromFile(self._dirUriMap, ev.path) g = Graph() if ev.content is FileIsGone: pass else: try: g.parse(data=cast(str, ev.content)) # todo: could fail here except Exception: traceback.print_exc() log.warn(f'ignoring this edit to {ev.path}') continue if not g.__len__(): continue await self.graphEditEvents.put(GraphEditEvent(uri, g.triples(ALLSTMTS))) # def aboutToPatch(self, ctx: URIRef): # """ # warn us that a patch is about to come to this context. it's more # straightforward to create the new file now # this is meant to make the file before we add triples, so we # wouldn't see the blank file and lose those triples. But it # didn't work, so there are other measures that make us not lose # the triples from a new file. Calling this before patching the # graph is still a reasonable thing to do, though. # """ # if ctx not in self.graphFiles: # outFile = fileForUri(self.dirUriMap, ctx) # assert '//' not in str(outFile), (outFile, self.dirUriMap, ctx) # log.info("starting new file %r", outFile) # def _addGraphFile(self, ctx: URIRef, path: Path): # self.addlPrefixes.setdefault(ctx, {}) # self.addlPrefixes.setdefault(None, {}) # gf = GraphFile(self.notifier, path, ctx, self.patch, self.getSubgraph, globalPrefixes=self.addlPrefixes[None], ctxPrefixes=self.addlPrefixes[ctx]) # self.graphFiles[ctx] = gf # # fileStats.mappedGraphFiles = len(self.graphFiles) # return gf # def dirtyFiles(self, ctxs): # """mark dirty the files that we watch in these contexts. # the ctx might not be a file that we already read; it might be # for a new file we have to create, or it might be for a # transient context that we're not going to save # if it's a ctx with no file, error # """ # for ctx in ctxs: # g = self.getSubgraph(ctx) # self.graphFiles[ctx].dirty(g)