diff --git a/light9/localsyncedgraph.py b/light9/localsyncedgraph.py new file mode 100644 --- /dev/null +++ b/light9/localsyncedgraph.py @@ -0,0 +1,19 @@ +from rdflib import ConjunctiveGraph + +from rdfdb.syncedgraph.currentstategraphapi import CurrentStateGraphApi +from rdfdb.syncedgraph.autodepgraphapi import AutoDepGraphApi +from rdfdb.syncedgraph.grapheditapi import GraphEditApi +from rdfdb.rdflibpatch import patchQuads + + +class LocalSyncedGraph(AutoDepGraphApi, GraphEditApi): + """for tests""" + + def __init__(self, files=None): + self._graph = ConjunctiveGraph() + for f in files or []: + self._graph.parse(f, format='n3') + + def patch(self, p): + patchQuads(self._graph, deleteQuads=p.delQuads, addQuads=p.addQuads, perfect=True) + # no deps diff --git a/light9/mock_syncedgraph.py b/light9/mock_syncedgraph.py new file mode 100644 --- /dev/null +++ b/light9/mock_syncedgraph.py @@ -0,0 +1,59 @@ +from rdflib import Graph, RDF, RDFS +from rdflib.parser import StringInputSource +from rdfdb.syncedgraph.syncedgraph import SyncedGraph + + +class MockSyncedGraph(SyncedGraph): + """ + Lets users of SyncedGraph mostly work. Doesn't yet help with any + testing of the rerun-upon-graph-change behavior. + """ + + def __init__(self, n3Content): + self._graph = Graph() + self._graph.parse(StringInputSource(n3Content), format='n3') + + def addHandler(self, func): + func() + + def value(self, + subject=None, + predicate=RDF.value, + object=None, + default=None, + any=True): + if object is not None: + raise NotImplementedError() + return self._graph.value(subject, + predicate, + object=object, + default=default, + any=any) + + def objects(self, subject=None, predicate=None): + return self._graph.objects(subject, predicate) + + def label(self, uri): + return self.value(uri, RDFS.label) + + def subjects(self, predicate=None, object=None): + return self._graph.subjects(predicate, object) + + def predicate_objects(self, subject): + return self._graph.predicate_objects(subject) + + def items(self, listUri): + """generator. Having a chain of watchers on the results is not + well-tested yet""" + chain = set([listUri]) + while listUri: + item = self.value(listUri, RDF.first) + if item: + yield item + listUri = self.value(listUri, RDF.rest) + if listUri in chain: + raise ValueError("List contains a recursive rdf:rest reference") + chain.add(listUri) + + def contains(self, triple): + return triple in self._graph