Files @ 4556eebe5d73
Branch filter:

Location: light9/src/light9/mock_syncedgraph.py

drewp@bigasterisk.com
topdir reorgs; let pdm have its src/ dir; separate vite area from light9/
from rdflib import Graph, RDF, RDFS
from rdflib.parser import StringInputSource
from rdfdb.syncedgraph.syncedgraph import SyncedGraph


class MockSyncedGraph(SyncedGraph):
    """
    Lets users of SyncedGraph mostly work. Doesn't yet help with any
    testing of the rerun-upon-graph-change behavior.
    """

    def __init__(self, n3Content):
        self._graph = Graph()
        self._graph.parse(StringInputSource(n3Content), format='n3')

    def addHandler(self, func):
        func()

    def value(self,
              subject=None,
              predicate=RDF.value,
              object=None,
              default=None,
              any=True):
        if object is not None:
            raise NotImplementedError()
        return self._graph.value(subject,
                                 predicate,
                                 object=object,
                                 default=default,
                                 any=any)

    def objects(self, subject=None, predicate=None):
        return self._graph.objects(subject, predicate)

    def label(self, uri):
        return self.value(uri, RDFS.label)

    def subjects(self, predicate=None, object=None):
        return self._graph.subjects(predicate, object)

    def predicate_objects(self, subject):
        return self._graph.predicate_objects(subject)

    def items(self, listUri):
        """generator. Having a chain of watchers on the results is not
        well-tested yet"""
        chain = set([listUri])
        while listUri:
            item = self.value(listUri, RDF.first)
            if item:
                yield item
            listUri = self.value(listUri, RDF.rest)
            if listUri in chain:
                raise ValueError("List contains a recursive rdf:rest reference")
            chain.add(listUri)

    def contains(self, triple):
        return triple in self._graph