Mercurial > code > home > repos > rdfdb
view rdfdb/mock_syncedgraph.py @ 37:9b4c551dddfb
fix registration bug (tried to send bytes to json encoder)
Ignore-this: 166816604c34341844b4b4bb0618ff71
author | Drew Perttula <drewp@bigasterisk.com> |
---|---|
date | Sat, 25 May 2019 23:54:54 +0000 |
parents | d487d597ad33 |
children | dc61012eeace |
line wrap: on
line source
from rdflib import Graph, RDF, RDFS from rdflib.parser import StringInputSource class MockSyncedGraph(object): """ Lets users of SyncedGraph mostly work. Doesn't yet help with any testing of the rerun-upon-graph-change behavior. """ def __init__(self, n3Content): self._graph = Graph() self._graph.parse(StringInputSource(n3Content), format='n3') def addHandler(self, func): func() def value(self, subject=None, predicate=RDF.value, object=None, default=None, any=True): if object is not None: raise NotImplementedError() return self._graph.value(subject, predicate, object=object, default=default, any=any) def objects(self, subject=None, predicate=None): return self._graph.objects(subject, predicate) def label(self, uri): return self.value(uri, RDFS.label) def subjects(self, predicate=None, object=None): return self._graph.subjects(predicate, object) def predicate_objects(self, subject): return self._graph.predicate_objects(subject) def items(self, listUri): """generator. Having a chain of watchers on the results is not well-tested yet""" chain = set([listUri]) while listUri: item = self.value(listUri, RDF.first) if item: yield item listUri = self.value(listUri, RDF.rest) if listUri in chain: raise ValueError("List contains a recursive rdf:rest reference") chain.add(listUri) def contains(self, triple): return triple in self._graph