Mercurial > code > home > repos > rdfdb
changeset 94:8bc63c7b619b
mv syncedgraph to subdir
author | drewp@bigasterisk.com |
---|---|
date | Sat, 21 May 2022 22:15:14 -0700 |
parents | f22723264489 |
children | 270f39920b30 |
files | rdfdb/autodepgraphapi.py rdfdb/currentstategraphapi.py rdfdb/currentstategraphapi_test.py rdfdb/grapheditapi.py rdfdb/grapheditapi_test.py rdfdb/mock_syncedgraph.py rdfdb/readonly_graph.py rdfdb/syncedgraph.py rdfdb/syncedgraph/README.md rdfdb/syncedgraph/__init__.py rdfdb/syncedgraph/autodepgraphapi.py rdfdb/syncedgraph/currentstategraphapi.py rdfdb/syncedgraph/currentstategraphapi_test.py rdfdb/syncedgraph/grapheditapi.py rdfdb/syncedgraph/grapheditapi_test.py rdfdb/syncedgraph/mock_syncedgraph.py rdfdb/syncedgraph/readonly_graph.py rdfdb/syncedgraph/syncedgraph.py rdfdb/syncedgraph/syncedgraph_base.py rdfdb/syncedgraph_base.py |
diffstat | 19 files changed, 800 insertions(+), 796 deletions(-) [+] |
line wrap: on
line diff
--- a/rdfdb/autodepgraphapi.py Sun May 15 15:52:01 2022 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,224 +0,0 @@ -import logging -from typing import Callable, Dict, List, Set, Tuple -from rdfdb.syncedgraph_base import SyncedGraphBase - -from rdflib import RDF, RDFS, URIRef - -from rdfdb.currentstategraphapi import contextsForStatementNoWildcards - -log = logging.getLogger('autodepgraphapi') - - -class AutoDepGraphApi(SyncedGraphBase): - """ - knockoutjs-inspired API for automatically building a dependency - tree while reading the graph. See addHandler(). - - This design is more aggressive than knockoutjs, since the obvious - query methods like value() all error if you're not in a watched - section of code. This is supposed to make it easier to notice - dependency mistakes, especially when porting old code to use - SyncedGraph. - - This class is a mixin for SyncedGraph, separated here because - these methods work together - """ - - def __init__(self): - self._watchers = _GraphWatchers() - self.currentFuncs: List[Callable[[], None]] = [] # stack of addHandler callers - - def addHandler(self, func: Callable[[], None]) -> None: - """ - run this (idempotent) func, noting what graph values it - uses. Run it again in the future if there are changes to those - graph values. The func might use different values during that - future call, and those will be what we watch for next. - """ - - # if we saw this func before, we need to forget the old - # callbacks it wanted and replace with the new ones we see - # now. - - # if one handler func calls another, does that break anything? - # maybe not? - - # no plan for sparql queries yet. Hook into a lower layer that - # reveals all their statement fetches? Just make them always - # new? Cache their results, so if i make the query again and - # it gives the same result, I don't call the handler? - - self.currentFuncs.append(func) - log.debug('graph.currentFuncs push %s', func) - try: - try: - func() - except: - import traceback - traceback.print_exc() - raise - finally: - self.currentFuncs.pop() - log.debug('graph.currentFuncs pop %s. stack now has %s', func, len(self.currentFuncs)) - - def runDepsOnNewPatch(self, p): - """ - patch p just happened to the graph; call everyone back who - might care, and then notice what data they depend on now - """ - for func in self._watchers.whoCares(p): - # todo: forget the old handlers for this func - log.debug('runDepsOnNewPatch calling watcher %s', p) - self.addHandler(func) - - def _getCurrentFunc(self): - if not self.currentFuncs: - # this may become a warning later - raise ValueError("asked for graph data outside of a handler") - - # we add the watcher to the deepest function, since that - # should be the cheapest way to update when this part of the - # data changes - return self.currentFuncs[-1] - - # these just call through to triples() so it might be possible to - # watch just that one. - - # if you get a bnode in your response, maybe the answer to - # dependency tracking is to say that you depended on the triple - # that got you that bnode, since it is likely to change to another - # bnode later. This won't work if the receiver stores bnodes - # between calls, but probably most of them don't do that (they - # work from a starting uri) - - def value(self, subject=None, predicate=RDF.value, object=None, default=None, any=True): - if object is not None: - raise NotImplementedError() - func = self._getCurrentFunc() - self._watchers.addSubjPredWatcher(func, subject, predicate) - return self._graph.value(subject, predicate, object=object, default=default, any=any) - - def objects(self, subject=None, predicate=None): - func = self._getCurrentFunc() - self._watchers.addSubjPredWatcher(func, subject, predicate) - return self._graph.objects(subject, predicate) - - def label(self, uri): - return self.value(uri, RDFS.label) - - def subjects(self, predicate=None, object=None): - func = self._getCurrentFunc() - self._watchers.addPredObjWatcher(func, predicate, object) - return self._graph.subjects(predicate, object) - - def predicate_objects(self, subject): - func = self._getCurrentFunc() - self._watchers.addSubjectWatcher(func, subject) - return self._graph.predicate_objects(subject) - - def items(self, listUri): - """generator. Having a chain of watchers on the results is not - well-tested yet""" - chain = set([listUri]) - while listUri: - item = self.value(listUri, RDF.first) - if item: - yield item - listUri = self.value(listUri, RDF.rest) - if listUri in chain: - raise ValueError("List contains a recursive rdf:rest reference") - chain.add(listUri) - - def contains(self, triple): - func = self._getCurrentFunc() - self._watchers.addTripleWatcher(func, triple) - return triple in self._graph - - def contextsForStatement(self, triple): - """currently this needs to be in an addHandler section, but it - sets no watchers so it won't actually update if the statement - was added or dropped from contexts""" - # func = self._getCurrentFunc() - return contextsForStatementNoWildcards(self._graph, triple) - - # i find myself wanting 'patch' (aka enter/leave) versions of these calls that tell - # you only what results have just appeared or disappeared. I think - # I'm going to be repeating that logic a lot. Maybe just for the - # subjects(RDF.type, t) call - - -HandlerSet = Set[Callable[[], None]] - - -class _GraphWatchers(object): - """ - store the current handlers that care about graph changes - """ - - def __init__(self): - self._handlersSp: Dict[Tuple[URIRef, URIRef], HandlerSet] = {} # (s,p): set(handlers) - self._handlersPo: Dict[Tuple[URIRef, URIRef], HandlerSet] = {} # (p,o): set(handlers) - self._handlersSpo: Dict[Tuple[URIRef, URIRef, URIRef], HandlerSet] = {} # (s,p,o): set(handlers) - self._handlersS: Dict[URIRef, HandlerSet] = {} # s: set(handlers) - - def addSubjPredWatcher(self, func, s, p): - if func is None: - return - key = s, p - try: - self._handlersSp.setdefault(key, set()).add(func) - except Exception: - log.error("with key %r and func %r" % (key, func)) - raise - - def addPredObjWatcher(self, func, p, o): - self._handlersPo.setdefault((p, o), set()).add(func) - - def addTripleWatcher(self, func, triple): - self._handlersSpo.setdefault(triple, set()).add(func) - - def addSubjectWatcher(self, func, s): - self._handlersS.setdefault(s, set()).add(func) - - def whoCares(self, patch): - """what handler functions would care about the changes in this patch? - - this removes the handlers that it gives you - """ - # self.dependencies() - ret: Set[Callable[[], None]] = set() - affectedSubjPreds = set([(s, p) for s, p, o, c in patch.addQuads] + [(s, p) for s, p, o, c in patch.delQuads]) - for (s, p), funcs in self._handlersSp.items(): - if (s, p) in affectedSubjPreds: - ret.update(funcs) - funcs.clear() - - affectedPredObjs = set([(p, o) for s, p, o, c in patch.addQuads] + [(p, o) for s, p, o, c in patch.delQuads]) - for (p, o), funcs in self._handlersPo.items(): - if (p, o) in affectedPredObjs: - ret.update(funcs) - funcs.clear() - - affectedTriples = set([(s, p, o) for s, p, o, c in patch.addQuads] + [(s, p, o) for s, p, o, c in patch.delQuads]) - for triple, funcs in self._handlersSpo.items(): - if triple in affectedTriples: - ret.update(funcs) - funcs.clear() - - affectedSubjs = set([s for s, p, o, c in patch.addQuads] + [s for s, p, o, c in patch.delQuads]) - for subj, funcs in self._handlersS.items(): - if subj in affectedSubjs: - ret.update(funcs) - funcs.clear() - - return ret - - def dependencies(self): - """ - for debugging, make a list of all the active handlers and what - data they depend on. This is meant for showing on the web ui - for browsing. - """ - log.info("whocares:") - from pprint import pprint - pprint(self._handlersSp)
--- a/rdfdb/currentstategraphapi.py Sun May 15 15:52:01 2022 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,93 +0,0 @@ -import itertools -import logging -import time -import traceback -from typing import Optional, Set, Tuple - -from rdflib import ConjunctiveGraph, URIRef -from rdflib.term import Node - -from rdfdb.rdflibpatch import contextsForStatement as rp_contextsForStatement -from rdfdb.readonly_graph import ReadOnlyConjunctiveGraph -from rdfdb.syncedgraph_base import SyncedGraphBase - -log = logging.getLogger("currentstate") - -TripleFilter = Tuple[Optional[Node], Optional[Node], Optional[Node]] - - -class Mgr(object): - - def __init__(self, graph, tripleFilter): - self._graph = graph - self._tripleFilter = tripleFilter - - def __enter__(self): - # this should be a readonly view of the existing - # graph, maybe with something to guard against - # writes/patches happening while reads are being - # done. Typical usage will do some reads on this graph - # before moving on to writes. - - if True: - g = ReadOnlyConjunctiveGraph(self._graph) - else: - t1 = time.time() - g = ConjunctiveGraph() - for s, p, o, c in self._graph.quads(self._tripleFilter): - g.store.add((s, p, o), c) - - if self._tripleFilter == (None, None, None): - self.logThisCopy(g, time.time() - t1) - - g.contextsForStatement = lambda stmt: contextsForStatementNoWildcards(g, stmt) - return g - - def logThisCopy(self, g, sec): - log.info("copied graph %s statements (%.1f ms) " "because of this:" % (len(g), sec * 1000)) - for frame in traceback.format_stack(limit=4)[:-2]: - for line in frame.splitlines(): - log.info(" " + line) - - def __exit__(self, type, val, tb): - return - - -class CurrentStateGraphApi(SyncedGraphBase): - """ - mixin for SyncedGraph, separated here because these methods work together - """ - - def currentState(self, context: Optional[URIRef] = None, tripleFilter: TripleFilter = (None, None, None)) -> Mgr: - """ - a graph you can read without being in an addHandler - - you can save some time by passing a triple filter, and we'll only give you the matching triples - """ - if context is not None: - raise NotImplementedError("currentState with context arg") - - return Mgr(self._graph, tripleFilter) - - _reservedSequentials: Optional[Set[URIRef]] = None - - def sequentialUri(self, prefix: URIRef) -> URIRef: - """ - Prefix URIRef like http://example.com/r- will return - http://example.com/r-1 if that uri is not a subject in the graph, - or else http://example.com/r-2, etc - """ - if self._reservedSequentials is None: - self._reservedSequentials = set() - for i in itertools.count(1): - newUri = URIRef(prefix + str(i)) - if newUri not in self._reservedSequentials and not list(self._graph.triples((newUri, None, None))): - self._reservedSequentials.add(newUri) - return newUri - raise NotImplementedError("never reached") - - -def contextsForStatementNoWildcards(g, triple): - if None in triple: - raise NotImplementedError("no wildcards") - return rp_contextsForStatement(g, triple)
--- a/rdfdb/currentstategraphapi_test.py Sun May 15 15:52:01 2022 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,13 +0,0 @@ -import unittest - -from rdflib import URIRef - -from rdfdb.syncedgraph import SyncedGraph - - -class TestSequentialUri(unittest.TestCase): - - def test_returnsSequentialUris(self): - g = SyncedGraph(URIRef('http://example.com/db/'), label='test') - self.assertEqual(g.sequentialUri(URIRef('http://example.com/foo')), URIRef('http://example.com/foo1')) - self.assertEqual(g.sequentialUri(URIRef('http://example.com/foo')), URIRef('http://example.com/foo2'))
--- a/rdfdb/grapheditapi.py Sun May 15 15:52:01 2022 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,104 +0,0 @@ -import logging -import random -from itertools import chain -from rdfdb.currentstategraphapi import CurrentStateGraphApi -from rdfdb.syncedgraph_base import SyncedGraphBase - -from rdflib import RDF, URIRef -from rdflib.term import Node - -from rdfdb.patch import Patch, quadsWithContextUris - -log = logging.getLogger('graphedit') - - -class GraphEditApi(CurrentStateGraphApi, SyncedGraphBase): - """ - fancier graph edits - - mixin for SyncedGraph, separated here because these methods work together - """ - - def getObjectPatch(self, context: URIRef, subject: Node, predicate: URIRef, newObject: Node) -> Patch: - """send a patch which removes existing values for (s,p,*,c) - and adds (s,p,newObject,c). Values in other graphs are not affected. - - newObject can be None, which will remove all (subj,pred,*) statements. - """ - - existing = [] - for spoc in quadsWithContextUris(self._graph.quads((subject, predicate, None, context))): - existing.append(spoc) - toAdd = ([(subject, predicate, newObject, context)] if newObject is not None else []) - return Patch(delQuads=existing, addQuads=toAdd).simplify() - - def patchObject(self, context: URIRef, subject: Node, predicate: URIRef, newObject: Node): - p = self.getObjectPatch(context, subject, predicate, newObject) - if not p.isNoop(): - log.debug("patchObject %r" % p.jsonRepr) - self.patch(p) # type: ignore - - def patchSubgraph(self, context, newGraph): - """ - replace all statements in 'context' with the quads in newGraph. - This is not cooperating with currentState. - """ - old = set(quadsWithContextUris(self._graph.quads((None, None, None, context)))) - new = set(quadsWithContextUris(newGraph)) - p = Patch(delQuads=old - new, addQuads=new - old) - self.patch(p) - return p # for debugging - - def patchMapping(self, context, subject, predicate, nodeClass, keyPred, valuePred, newKey, newValue): - """ - creates/updates a structure like this: - - ?subject ?predicate [ - a ?nodeClass; - ?keyPred ?newKey; - ?valuePred ?newValue ] . - - There should be a complementary readMapping that gets you a - value since that's tricky too - """ - - # as long as currentState is expensive and has the - # tripleFilter optimization, this looks like a mess. If - # currentState became cheap, a lot of code here could go away. - - with self.currentState(tripleFilter=(subject, predicate, None)) as current: - adds = set([]) - for setting in current.objects(subject, predicate): - with self.currentState(tripleFilter=(setting, keyPred, None)) as current2: - - match = current2.value(setting, keyPred) == newKey - if match: - break - else: - setting = URIRef(subject + "/map/%s" % random.randrange(999999999)) - adds.update([ - (subject, predicate, setting, context), - (setting, RDF.type, nodeClass, context), - (setting, keyPred, newKey, context), - ]) - - with self.currentState(tripleFilter=(setting, valuePred, None)) as current: - dels = set([]) - for prev in current.objects(setting, valuePred): - dels.add((setting, valuePred, prev, context)) - adds.add((setting, valuePred, newValue, context)) - - if adds != dels: - self.patch(Patch(delQuads=dels, addQuads=adds)) - - def removeMappingNode(self, context, node): - """ - removes the statements with this node as subject or object, which - is the right amount of statements to remove a node that - patchMapping made. - """ - p = Patch(delQuads=[ - spo + (context,) - for spo in chain(self._graph.triples((None, None, node), context=context), self._graph.triples((node, None, None), context=context)) - ]) - self.patch(p)
--- a/rdfdb/grapheditapi_test.py Sun May 15 15:52:01 2022 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,30 +0,0 @@ -import unittest - -from rdflib import ConjunctiveGraph, URIRef - -from rdfdb.grapheditapi import GraphEditApi - - -class TestPatchSubgraph(unittest.TestCase): - - def testCollapsesIdenticalQuads(self): - appliedPatches = [] - - class Obj(GraphEditApi): - def __init__(self): - pass - - def patch(self, p): - appliedPatches.append(p) - - _graph: ConjunctiveGraph - - obj = Obj() - obj._graph = ConjunctiveGraph() - stmt1 = (URIRef('s'), URIRef('p'), URIRef('o'), URIRef('g')) - obj._graph.addN([stmt1]) - obj.patchSubgraph(URIRef('g'), [stmt1]) - self.assertEqual(len(appliedPatches), 1) - p = appliedPatches[0] - self.assertTrue(p.isNoop()) - self.assertEqual(p.jsonRepr, '{"patch": {"adds": "", "deletes": ""}}')
--- a/rdfdb/mock_syncedgraph.py Sun May 15 15:52:01 2022 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,49 +0,0 @@ -from rdflib import RDF, RDFS, Graph -from rdflib.parser import StringInputSource - - -class MockSyncedGraph(object): - """ - Lets users of SyncedGraph mostly work. Doesn't yet help with any - testing of the rerun-upon-graph-change behavior. - """ - - def __init__(self, n3Content): - self._graph = Graph() - self._graph.parse(StringInputSource(n3Content), format='n3') - - def addHandler(self, func): - func() - - def value(self, subject=None, predicate=RDF.value, object=None, default=None, any=True): - if object is not None: - raise NotImplementedError() - return self._graph.value(subject, predicate, object=object, default=default, any=any) - - def objects(self, subject=None, predicate=None): - return self._graph.objects(subject, predicate) - - def label(self, uri): - return self.value(uri, RDFS.label) - - def subjects(self, predicate=None, object=None): - return self._graph.subjects(predicate, object) - - def predicate_objects(self, subject): - return self._graph.predicate_objects(subject) - - def items(self, listUri): - """generator. Having a chain of watchers on the results is not - well-tested yet""" - chain = set([listUri]) - while listUri: - item = self.value(listUri, RDF.first) - if item: - yield item - listUri = self.value(listUri, RDF.rest) - if listUri in chain: - raise ValueError("List contains a recursive rdf:rest reference") - chain.add(listUri) - - def contains(self, triple): - return triple in self._graph
--- a/rdfdb/readonly_graph.py Sun May 15 15:52:01 2022 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,17 +0,0 @@ -class ReadOnlyConjunctiveGraph(object): - """similar to rdflib's ReadOnlyGraphAggregate but takes one CJ in, instead - of a bunch of Graphs""" - - def __init__(self, graph): - self.graph = graph - - def __getattr__(self, attr): - if attr in ['subjects', 'value', 'objects', 'triples', 'label']: # not complete - return getattr(self.graph, attr) - raise TypeError("can't access %r of read-only graph" % attr) - - def __len__(self): - return len(self.graph) - - def contextsForStatement(self, stmt): - raise NotImplementedError
--- a/rdfdb/syncedgraph.py Sun May 15 15:52:01 2022 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,40 +0,0 @@ -""" -client code uses a SyncedGraph, which has a few things: - -AutoDepGraphApi - knockoutjs-inspired API for querying the graph in a -way that lets me call you again when there were changes to the things -you queried - -CurrentStateGraphApi - a way to query the graph that doesn't gather -your dependencies like AutoDepGraphApi does - -GraphEditApi - methods to write patches to the graph for common -operations, e.g. replacing a value, or editing a mapping - -WsClientProtocol one connection with the rdfdb server. -""" - -from typing import Optional -from rdfdb.autodepgraphapi import AutoDepGraphApi -from rdfdb.grapheditapi import GraphEditApi -# everybody who writes literals needs to get this -from rdfdb.rdflibpatch_literal import patch -from rdfdb.syncedgraph_base import SyncedGraphBase -from rdflib import URIRef -patch() - - -class SyncedGraph(AutoDepGraphApi, GraphEditApi): - ''' - SyncedGraphBase - | | - CurState AutoDep - | | - GraphEdit | - | | - SyncedGraph - ''' - def __init__(self, rdfdbRoot: URIRef, label: str, receiverHost: Optional[str] = None): - SyncedGraphBase.__init__(self, rdfdbRoot, label, receiverHost) - AutoDepGraphApi.__init__(self) -
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/syncedgraph/README.md Sat May 21 22:15:14 2022 -0700 @@ -0,0 +1,1 @@ +client for rdfdb.service that sends/receives patches over a websocket connection \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/syncedgraph/autodepgraphapi.py Sat May 21 22:15:14 2022 -0700 @@ -0,0 +1,224 @@ +import logging +from typing import Callable, Dict, List, Set, Tuple + +from rdfdb.syncedgraph.currentstategraphapi import \ + contextsForStatementNoWildcards +from rdfdb.syncedgraph.syncedgraph_base import SyncedGraphBase +from rdflib import RDF, RDFS, URIRef + +log = logging.getLogger('autodepgraphapi') + + +class AutoDepGraphApi(SyncedGraphBase): + """ + knockoutjs-inspired API for automatically building a dependency + tree while reading the graph. See addHandler(). + + This design is more aggressive than knockoutjs, since the obvious + query methods like value() all error if you're not in a watched + section of code. This is supposed to make it easier to notice + dependency mistakes, especially when porting old code to use + SyncedGraph. + + This class is a mixin for SyncedGraph, separated here because + these methods work together + """ + + def __init__(self): + self._watchers = _GraphWatchers() + self.currentFuncs: List[Callable[[], None]] = [] # stack of addHandler callers + + def addHandler(self, func: Callable[[], None]) -> None: + """ + run this (idempotent) func, noting what graph values it + uses. Run it again in the future if there are changes to those + graph values. The func might use different values during that + future call, and those will be what we watch for next. + """ + + # if we saw this func before, we need to forget the old + # callbacks it wanted and replace with the new ones we see + # now. + + # if one handler func calls another, does that break anything? + # maybe not? + + # no plan for sparql queries yet. Hook into a lower layer that + # reveals all their statement fetches? Just make them always + # new? Cache their results, so if i make the query again and + # it gives the same result, I don't call the handler? + + self.currentFuncs.append(func) + log.debug('graph.currentFuncs push %s', func) + try: + try: + func() + except: + import traceback + traceback.print_exc() + raise + finally: + self.currentFuncs.pop() + log.debug('graph.currentFuncs pop %s. stack now has %s', func, len(self.currentFuncs)) + + def runDepsOnNewPatch(self, p): + """ + patch p just happened to the graph; call everyone back who + might care, and then notice what data they depend on now + """ + for func in self._watchers.whoCares(p): + # todo: forget the old handlers for this func + log.debug('runDepsOnNewPatch calling watcher %s', p) + self.addHandler(func) + + def _getCurrentFunc(self): + if not self.currentFuncs: + # this may become a warning later + raise ValueError("asked for graph data outside of a handler") + + # we add the watcher to the deepest function, since that + # should be the cheapest way to update when this part of the + # data changes + return self.currentFuncs[-1] + + # these just call through to triples() so it might be possible to + # watch just that one. + + # if you get a bnode in your response, maybe the answer to + # dependency tracking is to say that you depended on the triple + # that got you that bnode, since it is likely to change to another + # bnode later. This won't work if the receiver stores bnodes + # between calls, but probably most of them don't do that (they + # work from a starting uri) + + def value(self, subject=None, predicate=RDF.value, object=None, default=None, any=True): + if object is not None: + raise NotImplementedError() + func = self._getCurrentFunc() + self._watchers.addSubjPredWatcher(func, subject, predicate) + return self._graph.value(subject, predicate, object=object, default=default, any=any) + + def objects(self, subject=None, predicate=None): + func = self._getCurrentFunc() + self._watchers.addSubjPredWatcher(func, subject, predicate) + return self._graph.objects(subject, predicate) + + def label(self, uri): + return self.value(uri, RDFS.label) + + def subjects(self, predicate=None, object=None): + func = self._getCurrentFunc() + self._watchers.addPredObjWatcher(func, predicate, object) + return self._graph.subjects(predicate, object) + + def predicate_objects(self, subject): + func = self._getCurrentFunc() + self._watchers.addSubjectWatcher(func, subject) + return self._graph.predicate_objects(subject) + + def items(self, listUri): + """generator. Having a chain of watchers on the results is not + well-tested yet""" + chain = set([listUri]) + while listUri: + item = self.value(listUri, RDF.first) + if item: + yield item + listUri = self.value(listUri, RDF.rest) + if listUri in chain: + raise ValueError("List contains a recursive rdf:rest reference") + chain.add(listUri) + + def contains(self, triple): + func = self._getCurrentFunc() + self._watchers.addTripleWatcher(func, triple) + return triple in self._graph + + def contextsForStatement(self, triple): + """currently this needs to be in an addHandler section, but it + sets no watchers so it won't actually update if the statement + was added or dropped from contexts""" + # func = self._getCurrentFunc() + return contextsForStatementNoWildcards(self._graph, triple) + + # i find myself wanting 'patch' (aka enter/leave) versions of these calls that tell + # you only what results have just appeared or disappeared. I think + # I'm going to be repeating that logic a lot. Maybe just for the + # subjects(RDF.type, t) call + + +HandlerSet = Set[Callable[[], None]] + + +class _GraphWatchers(object): + """ + store the current handlers that care about graph changes + """ + + def __init__(self): + self._handlersSp: Dict[Tuple[URIRef, URIRef], HandlerSet] = {} # (s,p): set(handlers) + self._handlersPo: Dict[Tuple[URIRef, URIRef], HandlerSet] = {} # (p,o): set(handlers) + self._handlersSpo: Dict[Tuple[URIRef, URIRef, URIRef], HandlerSet] = {} # (s,p,o): set(handlers) + self._handlersS: Dict[URIRef, HandlerSet] = {} # s: set(handlers) + + def addSubjPredWatcher(self, func, s, p): + if func is None: + return + key = s, p + try: + self._handlersSp.setdefault(key, set()).add(func) + except Exception: + log.error("with key %r and func %r" % (key, func)) + raise + + def addPredObjWatcher(self, func, p, o): + self._handlersPo.setdefault((p, o), set()).add(func) + + def addTripleWatcher(self, func, triple): + self._handlersSpo.setdefault(triple, set()).add(func) + + def addSubjectWatcher(self, func, s): + self._handlersS.setdefault(s, set()).add(func) + + def whoCares(self, patch): + """what handler functions would care about the changes in this patch? + + this removes the handlers that it gives you + """ + # self.dependencies() + ret: Set[Callable[[], None]] = set() + affectedSubjPreds = set([(s, p) for s, p, o, c in patch.addQuads] + [(s, p) for s, p, o, c in patch.delQuads]) + for (s, p), funcs in self._handlersSp.items(): + if (s, p) in affectedSubjPreds: + ret.update(funcs) + funcs.clear() + + affectedPredObjs = set([(p, o) for s, p, o, c in patch.addQuads] + [(p, o) for s, p, o, c in patch.delQuads]) + for (p, o), funcs in self._handlersPo.items(): + if (p, o) in affectedPredObjs: + ret.update(funcs) + funcs.clear() + + affectedTriples = set([(s, p, o) for s, p, o, c in patch.addQuads] + [(s, p, o) for s, p, o, c in patch.delQuads]) + for triple, funcs in self._handlersSpo.items(): + if triple in affectedTriples: + ret.update(funcs) + funcs.clear() + + affectedSubjs = set([s for s, p, o, c in patch.addQuads] + [s for s, p, o, c in patch.delQuads]) + for subj, funcs in self._handlersS.items(): + if subj in affectedSubjs: + ret.update(funcs) + funcs.clear() + + return ret + + def dependencies(self): + """ + for debugging, make a list of all the active handlers and what + data they depend on. This is meant for showing on the web ui + for browsing. + """ + log.info("whocares:") + from pprint import pprint + pprint(self._handlersSp)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/syncedgraph/currentstategraphapi.py Sat May 21 22:15:14 2022 -0700 @@ -0,0 +1,92 @@ +import itertools +import logging +import time +import traceback +from typing import Optional, Set, Tuple + +from rdfdb.rdflibpatch import contextsForStatement as rp_contextsForStatement +from rdfdb.readonly_graph import ReadOnlyConjunctiveGraph +from rdfdb.syncedgraph.syncedgraph_base import SyncedGraphBase +from rdflib import ConjunctiveGraph, URIRef +from rdflib.term import Node + +log = logging.getLogger("currentstate") + +TripleFilter = Tuple[Optional[Node], Optional[Node], Optional[Node]] + + +class Mgr(object): + + def __init__(self, graph, tripleFilter): + self._graph = graph + self._tripleFilter = tripleFilter + + def __enter__(self): + # this should be a readonly view of the existing + # graph, maybe with something to guard against + # writes/patches happening while reads are being + # done. Typical usage will do some reads on this graph + # before moving on to writes. + + if True: + g = ReadOnlyConjunctiveGraph(self._graph) + else: + t1 = time.time() + g = ConjunctiveGraph() + for s, p, o, c in self._graph.quads(self._tripleFilter): + g.store.add((s, p, o), c) + + if self._tripleFilter == (None, None, None): + self.logThisCopy(g, time.time() - t1) + + g.contextsForStatement = lambda stmt: contextsForStatementNoWildcards(g, stmt) + return g + + def logThisCopy(self, g, sec): + log.info("copied graph %s statements (%.1f ms) " "because of this:" % (len(g), sec * 1000)) + for frame in traceback.format_stack(limit=4)[:-2]: + for line in frame.splitlines(): + log.info(" " + line) + + def __exit__(self, type, val, tb): + return + + +class CurrentStateGraphApi(SyncedGraphBase): + """ + mixin for SyncedGraph, separated here because these methods work together + """ + + def currentState(self, context: Optional[URIRef] = None, tripleFilter: TripleFilter = (None, None, None)) -> Mgr: + """ + a graph you can read without being in an addHandler + + you can save some time by passing a triple filter, and we'll only give you the matching triples + """ + if context is not None: + raise NotImplementedError("currentState with context arg") + + return Mgr(self._graph, tripleFilter) + + _reservedSequentials: Optional[Set[URIRef]] = None + + def sequentialUri(self, prefix: URIRef) -> URIRef: + """ + Prefix URIRef like http://example.com/r- will return + http://example.com/r-1 if that uri is not a subject in the graph, + or else http://example.com/r-2, etc + """ + if self._reservedSequentials is None: + self._reservedSequentials = set() + for i in itertools.count(1): + newUri = URIRef(prefix + str(i)) + if newUri not in self._reservedSequentials and not list(self._graph.triples((newUri, None, None))): + self._reservedSequentials.add(newUri) + return newUri + raise NotImplementedError("never reached") + + +def contextsForStatementNoWildcards(g, triple): + if None in triple: + raise NotImplementedError("no wildcards") + return rp_contextsForStatement(g, triple)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/syncedgraph/currentstategraphapi_test.py Sat May 21 22:15:14 2022 -0700 @@ -0,0 +1,12 @@ +import unittest + +from rdfdb.syncedgraph.syncedgraph import SyncedGraph +from rdflib import URIRef + + +class TestSequentialUri(unittest.TestCase): + + def test_returnsSequentialUris(self): + g = SyncedGraph(URIRef('http://example.com/db/'), label='test') + self.assertEqual(g.sequentialUri(URIRef('http://example.com/foo')), URIRef('http://example.com/foo1')) + self.assertEqual(g.sequentialUri(URIRef('http://example.com/foo')), URIRef('http://example.com/foo2'))
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/syncedgraph/grapheditapi.py Sat May 21 22:15:14 2022 -0700 @@ -0,0 +1,103 @@ +import logging +import random +from itertools import chain + +from rdfdb.patch import Patch, quadsWithContextUris +from rdfdb.syncedgraph.currentstategraphapi import CurrentStateGraphApi +from rdfdb.syncedgraph.syncedgraph_base import SyncedGraphBase +from rdflib import RDF, URIRef +from rdflib.term import Node + +log = logging.getLogger('graphedit') + + +class GraphEditApi(CurrentStateGraphApi, SyncedGraphBase): + """ + fancier graph edits + + mixin for SyncedGraph, separated here because these methods work together + """ + + def getObjectPatch(self, context: URIRef, subject: Node, predicate: URIRef, newObject: Node) -> Patch: + """send a patch which removes existing values for (s,p,*,c) + and adds (s,p,newObject,c). Values in other graphs are not affected. + + newObject can be None, which will remove all (subj,pred,*) statements. + """ + + existing = [] + for spoc in quadsWithContextUris(self._graph.quads((subject, predicate, None, context))): + existing.append(spoc) + toAdd = ([(subject, predicate, newObject, context)] if newObject is not None else []) + return Patch(delQuads=existing, addQuads=toAdd).simplify() + + def patchObject(self, context: URIRef, subject: Node, predicate: URIRef, newObject: Node): + p = self.getObjectPatch(context, subject, predicate, newObject) + if not p.isNoop(): + log.debug("patchObject %r" % p.jsonRepr) + self.patch(p) # type: ignore + + def patchSubgraph(self, context, newGraph): + """ + replace all statements in 'context' with the quads in newGraph. + This is not cooperating with currentState. + """ + old = set(quadsWithContextUris(self._graph.quads((None, None, None, context)))) + new = set(quadsWithContextUris(newGraph)) + p = Patch(delQuads=old - new, addQuads=new - old) + self.patch(p) + return p # for debugging + + def patchMapping(self, context, subject, predicate, nodeClass, keyPred, valuePred, newKey, newValue): + """ + creates/updates a structure like this: + + ?subject ?predicate [ + a ?nodeClass; + ?keyPred ?newKey; + ?valuePred ?newValue ] . + + There should be a complementary readMapping that gets you a + value since that's tricky too + """ + + # as long as currentState is expensive and has the + # tripleFilter optimization, this looks like a mess. If + # currentState became cheap, a lot of code here could go away. + + with self.currentState(tripleFilter=(subject, predicate, None)) as current: + adds = set([]) + for setting in current.objects(subject, predicate): + with self.currentState(tripleFilter=(setting, keyPred, None)) as current2: + + match = current2.value(setting, keyPred) == newKey + if match: + break + else: + setting = URIRef(subject + "/map/%s" % random.randrange(999999999)) + adds.update([ + (subject, predicate, setting, context), + (setting, RDF.type, nodeClass, context), + (setting, keyPred, newKey, context), + ]) + + with self.currentState(tripleFilter=(setting, valuePred, None)) as current: + dels = set([]) + for prev in current.objects(setting, valuePred): + dels.add((setting, valuePred, prev, context)) + adds.add((setting, valuePred, newValue, context)) + + if adds != dels: + self.patch(Patch(delQuads=dels, addQuads=adds)) + + def removeMappingNode(self, context, node): + """ + removes the statements with this node as subject or object, which + is the right amount of statements to remove a node that + patchMapping made. + """ + p = Patch(delQuads=[ + spo + (context,) + for spo in chain(self._graph.triples((None, None, node), context=context), self._graph.triples((node, None, None), context=context)) + ]) + self.patch(p)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/syncedgraph/grapheditapi_test.py Sat May 21 22:15:14 2022 -0700 @@ -0,0 +1,30 @@ +import unittest + +from rdfdb.syncedgraph.grapheditapi import GraphEditApi +from rdflib import ConjunctiveGraph, URIRef + + +class TestPatchSubgraph(unittest.TestCase): + + def testCollapsesIdenticalQuads(self): + appliedPatches = [] + + class Obj(GraphEditApi): + + def __init__(self): + pass + + def patch(self, p): + appliedPatches.append(p) + + _graph: ConjunctiveGraph + + obj = Obj() + obj._graph = ConjunctiveGraph() + stmt1 = (URIRef('s'), URIRef('p'), URIRef('o'), URIRef('g')) + obj._graph.addN([stmt1]) + obj.patchSubgraph(URIRef('g'), [stmt1]) + self.assertEqual(len(appliedPatches), 1) + p = appliedPatches[0] + self.assertTrue(p.isNoop()) + self.assertEqual(p.jsonRepr, '{"patch": {"adds": "", "deletes": ""}}')
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/syncedgraph/mock_syncedgraph.py Sat May 21 22:15:14 2022 -0700 @@ -0,0 +1,49 @@ +from rdflib import RDF, RDFS, Graph +from rdflib.parser import StringInputSource + + +class MockSyncedGraph(object): + """ + Lets users of SyncedGraph mostly work. Doesn't yet help with any + testing of the rerun-upon-graph-change behavior. + """ + + def __init__(self, n3Content): + self._graph = Graph() + self._graph.parse(StringInputSource(n3Content), format='n3') + + def addHandler(self, func): + func() + + def value(self, subject=None, predicate=RDF.value, object=None, default=None, any=True): + if object is not None: + raise NotImplementedError() + return self._graph.value(subject, predicate, object=object, default=default, any=any) + + def objects(self, subject=None, predicate=None): + return self._graph.objects(subject, predicate) + + def label(self, uri): + return self.value(uri, RDFS.label) + + def subjects(self, predicate=None, object=None): + return self._graph.subjects(predicate, object) + + def predicate_objects(self, subject): + return self._graph.predicate_objects(subject) + + def items(self, listUri): + """generator. Having a chain of watchers on the results is not + well-tested yet""" + chain = set([listUri]) + while listUri: + item = self.value(listUri, RDF.first) + if item: + yield item + listUri = self.value(listUri, RDF.rest) + if listUri in chain: + raise ValueError("List contains a recursive rdf:rest reference") + chain.add(listUri) + + def contains(self, triple): + return triple in self._graph
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/syncedgraph/readonly_graph.py Sat May 21 22:15:14 2022 -0700 @@ -0,0 +1,17 @@ +class ReadOnlyConjunctiveGraph(object): + """similar to rdflib's ReadOnlyGraphAggregate but takes one CJ in, instead + of a bunch of Graphs""" + + def __init__(self, graph): + self.graph = graph + + def __getattr__(self, attr): + if attr in ['subjects', 'value', 'objects', 'triples', 'label']: # not complete + return getattr(self.graph, attr) + raise TypeError("can't access %r of read-only graph" % attr) + + def __len__(self): + return len(self.graph) + + def contextsForStatement(self, stmt): + raise NotImplementedError
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/syncedgraph/syncedgraph.py Sat May 21 22:15:14 2022 -0700 @@ -0,0 +1,42 @@ +""" +client code uses a SyncedGraph, which has a few things: + +AutoDepGraphApi - knockoutjs-inspired API for querying the graph in a +way that lets me call you again when there were changes to the things +you queried + +CurrentStateGraphApi - a way to query the graph that doesn't gather +your dependencies like AutoDepGraphApi does + +GraphEditApi - methods to write patches to the graph for common +operations, e.g. replacing a value, or editing a mapping + +WsClientProtocol one connection with the rdfdb server. +""" + +from typing import Optional + +# everybody who writes literals needs to get this +from rdfdb.rdflibpatch_literal import patch +from rdfdb.syncedgraph.autodepgraphapi import AutoDepGraphApi +from rdfdb.syncedgraph.grapheditapi import GraphEditApi +from rdfdb.syncedgraph.syncedgraph_base import SyncedGraphBase +from rdflib import URIRef + +patch() + + +class SyncedGraph(AutoDepGraphApi, GraphEditApi): + ''' + SyncedGraphBase + | | + CurState AutoDep + | | + GraphEdit | + | | + SyncedGraph + ''' + + def __init__(self, rdfdbRoot: URIRef, label: str, receiverHost: Optional[str] = None): + SyncedGraphBase.__init__(self, rdfdbRoot, label, receiverHost) + AutoDepGraphApi.__init__(self)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/syncedgraph/syncedgraph_base.py Sat May 21 22:15:14 2022 -0700 @@ -0,0 +1,230 @@ +import asyncio +import json +import logging +import traceback +from typing import Any, Optional, cast + +import aiohttp +from rdfdb.patch import Patch +from rdfdb.rdflibpatch import patchQuads +from rdflib import ConjunctiveGraph, URIRef + +log = logging.getLogger('syncedgraph') + +# class WsClientProtocol(autobahn.twisted.websocket.WebSocketClientProtocol): +# """The server for this is service.WebsocketClient""" + +# def __init__(self, sg): +# super().__init__() +# self.sg = sg +# self.sg.currentClient = self +# self.connectionId = None + +# def onConnect(self, response): +# log.info('conn %r', response) + +# def onOpen(self): +# log.info('ws open') +# self.sg.isConnected = True + +# def onMessage(self, payload, isBinary): +# msg = json.loads(payload) +# if 'connectedAs' in msg: +# self.connectionId = msg['connectedAs'] +# log.info(f'rdfdb calls us {self.connectionId}') +# elif 'patch' in msg: +# p = Patch(jsonRepr=payload.decode('utf8')) +# log.debug("received patch %s", p.shortSummary()) +# self.sg.onPatchFromDb(p) +# else: +# log.warn('unknown msg from websocket: %s...', payload[:32]) + +# def sendPatch(self, p: Patch): +# # this is where we could concatenate little patches into a +# # bigger one. Often, many statements will cancel each +# # other out. + +# # also who's going to accumulate patches when server is down, +# # or is that not allowed? +# if self.connectionId is None: +# raise ValueError("can't send patches before we get an id") +# body = p.makeJsonRepr() +# log.debug(f'connectionId={self.connectionId} sending patch {len(body)} bytes') +# self.sendMessage(body.encode('utf8')) + +# def onClose(self, wasClean, code, reason): +# log.info("WebSocket connection closed: {0}".format(reason)) +# self.sg.lostRdfdbConnection() + +# reactor = cast(IReactorCore, twisted.internet.reactor) + + +class SyncedGraphBase(object): + """ + graph for clients to use. Changes are synced with the master graph + in the rdfdb process. + + self.patch(p: Patch) is the only way to write to the graph. + + Reading can be done with the AutoDepGraphApi methods which set up + watchers to call you back when the results of the read have + changed (like knockoutjs). Or you can read with + CurrentStateGraphApi which doesn't have watchers, but you have to + opt into using it so it's clear you aren't in an auto-dep context + and meant to set up watchers. + + You may want to attach to self.initiallySynced deferred so you + don't attempt patches before we've heard the initial contents of + the graph. It would be ok to accumulate some patches of new + material, but usually you won't correctly remove the existing + statements unless we have the correct graph. + + If we get out of sync, we abandon our local graph (even any + pending local changes) and get the data again from the server. + """ + + def __init__(self, rdfdbRoot: URIRef, label: str, receiverHost: Optional[str] = None): + """ + label is a string that the server will display in association + with your connection + + receiverHost is the hostname other nodes can use to talk to me + """ + self.rdfdbRoot = rdfdbRoot + self.httpSession = aiohttp.ClientSession() + self._senderTask = asyncio.create_task(self._sender()) + + self._initiallySynced = asyncio.Future() + self._graph = ConjunctiveGraph() + + # todo: + # AutoDepGraphApi.__init__(self) + + # this needs more state to track if we're doing a resync (and + # everything has to error or wait) or if we're live + + async def _sender(self): + while True: + with self.httpSession.ws_connect(self.rdfdbRoot.replace('http://', 'ws://') + 'syncedGraph') as ws: + async for msg in ws: + log.info(f"server sent us {msg=}") + # if msg.type == aiohttp.WSMsgType.TEXT: + # if msg.data == 'close cmd': + # await ws.close() + # break + # else: + # await ws.send_str(msg.data + '/answer') + # elif msg.type == aiohttp.WSMsgType.ERROR: + # break + self.lostRdfdbConnection() + log.info("lost connection- retry") + await asyncio.sleep(4) + + async def init(self): + """return when we have the initial graph from server. + + maybe this isn't really needed, as everything ought to be resilent + to the intial graph pouring in. + """ + await self._initiallySynced + + def lostRdfdbConnection(self) -> None: + self.isConnected = False + self.patch(Patch(delQuads=self._graph.quads())) + log.info(f'cleared graph to {len(self._graph)}') + log.error('graph is not updating- you need to restart') + + def resync(self): + """ + get the whole graph again from the server (e.g. we had a + conflict while applying a patch and want to return to the + truth). + + To avoid too much churn, we remember our old graph and diff it + against the replacement. This way, our callers only see the + corrections. + + Edits you make during a resync will surely be lost, so I + should just fail them. There should be a notification back to + UIs who want to show that we're doing a resync. + """ + log.info('resync') + if self.currentClient: + self.currentClient.dropConnection() + + def _resyncGraph(self, response): + log.warn("new graph in") + + if self.currentClient: + self.currentClient.dropConnection() + # diff against old entire graph + # broadcast that change + + def runDepsOnNewPatch(self, p): + # See AutoDepGraphApi + pass + + def patch(self, p: Patch) -> None: + """send this patch to the server and apply it to our local + graph and run handlers""" + + if not self.isConnected or self.currentClient is None: + log.warn("not currently connected- dropping patch") + return + + if p.isNoop(): + log.info("skipping no-op patch") + return + + # these could fail if we're out of sync. One approach: + # Rerequest the full state from the server, try the patch + # again after that, then give up. + debugKey = '[id=%s]' % (id(p) % 1000) + log.debug("\napply local patch %s %s", debugKey, p) + try: + self._applyPatchLocally(p) + except ValueError as e: + log.error(e) + self.resync() + return + log.debug('runDepsOnNewPatch') + self.runDepsOnNewPatch(p) + log.debug('sendPatch') + self.currentClient.sendPatch(p) + log.debug('patch is done %s', debugKey) + + async def suggestPrefixes(self, ctx, prefixes): + """ + when writing files for this ctx, try to use these n3 + prefixes. async, not guaranteed to finish before any + particular file flush + """ + await self.httpSession.post(self.rdfdbRoot + 'prefixes', data=json.dumps({'ctx': ctx, 'prefixes': prefixes})) + + def _applyPatchLocally(self, p: Patch): + # .. and disconnect on failure + patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True) + log.debug("graph now has %s statements" % len(self._graph)) + + def onPatchFromDb(self, p): + """ + central server has sent us a patch + """ + if log.isEnabledFor(logging.DEBUG): + if len(p.addQuads) > 50: + log.debug('server has sent us %s', p.shortSummary()) + else: + log.debug('server has sent us %s', p) + + self._applyPatchLocally(p) + try: + self.runDepsOnNewPatch(p) + except Exception: + # don't reflect this error back to the server; we did + # receive its patch correctly. However, we're in a bad + # state since some dependencies may not have rerun + traceback.print_exc() + log.warn("some graph dependencies may not have completely run") + + if not self._initiallySynced.done(): + self._initiallySynced.set_result(None)
--- a/rdfdb/syncedgraph_base.py Sun May 15 15:52:01 2022 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,226 +0,0 @@ -import json -import logging -import traceback -import urllib.parse -from typing import Any, Optional, cast - -import autobahn.twisted.websocket -import treq -import twisted.internet.reactor -from rdflib import ConjunctiveGraph, URIRef -from twisted.internet import defer -from twisted.internet.interfaces import IReactorCore - -from rdfdb.patch import Patch -from rdfdb.rdflibpatch import patchQuads - - -class WsClientProtocol(autobahn.twisted.websocket.WebSocketClientProtocol): - """The server for this is service.WebsocketClient""" - - def __init__(self, sg): - super().__init__() - self.sg = sg - self.sg.currentClient = self - self.connectionId = None - - def onConnect(self, response): - log.info('conn %r', response) - - def onOpen(self): - log.info('ws open') - self.sg.isConnected = True - - def onMessage(self, payload, isBinary): - msg = json.loads(payload) - if 'connectedAs' in msg: - self.connectionId = msg['connectedAs'] - log.info(f'rdfdb calls us {self.connectionId}') - elif 'patch' in msg: - p = Patch(jsonRepr=payload.decode('utf8')) - log.debug("received patch %s", p.shortSummary()) - self.sg.onPatchFromDb(p) - else: - log.warn('unknown msg from websocket: %s...', payload[:32]) - - def sendPatch(self, p: Patch): - # this is where we could concatenate little patches into a - # bigger one. Often, many statements will cancel each - # other out. - - # also who's going to accumulate patches when server is down, - # or is that not allowed? - if self.connectionId is None: - raise ValueError("can't send patches before we get an id") - body = p.makeJsonRepr() - log.debug(f'connectionId={self.connectionId} sending patch {len(body)} bytes') - self.sendMessage(body.encode('utf8')) - - def onClose(self, wasClean, code, reason): - log.info("WebSocket connection closed: {0}".format(reason)) - self.sg.lostRdfdbConnection() - - -reactor = cast(IReactorCore, twisted.internet.reactor) - -log = logging.getLogger('syncedgraph') - - -class SyncedGraphBase(object): - """ - graph for clients to use. Changes are synced with the master graph - in the rdfdb process. - - self.patch(p: Patch) is the only way to write to the graph. - - Reading can be done with the AutoDepGraphApi methods which set up - watchers to call you back when the results of the read have - changed (like knockoutjs). Or you can read with - CurrentStateGraphApi which doesn't have watchers, but you have to - opt into using it so it's clear you aren't in an auto-dep context - and meant to set up watchers. - - You may want to attach to self.initiallySynced deferred so you - don't attempt patches before we've heard the initial contents of - the graph. It would be ok to accumulate some patches of new - material, but usually you won't correctly remove the existing - statements unless we have the correct graph. - - If we get out of sync, we abandon our local graph (even any - pending local changes) and get the data again from the server. - """ - - def __init__(self, rdfdbRoot: URIRef, label: str, receiverHost: Optional[str] = None): - """ - label is a string that the server will display in association - with your connection - - receiverHost is the hostname other nodes can use to talk to me - """ - self.isConnected = False - self.currentClient: Optional[WsClientProtocol] = None - self.rdfdbRoot = rdfdbRoot - self.connectSocket() - self.initiallySynced: defer.Deferred[None] = defer.Deferred() - self._graph = ConjunctiveGraph() - - # todo: - # AutoDepGraphApi.__init__(self) - - # this needs more state to track if we're doing a resync (and - # everything has to error or wait) or if we're live - - def lostRdfdbConnection(self) -> None: - self.isConnected = False - self.patch(Patch(delQuads=self._graph.quads())) - log.info(f'cleared graph to {len(self._graph)}') - log.error('graph is not updating- you need to restart') - self.connectSocket() - - def connectSocket(self) -> None: - factory = autobahn.twisted.websocket.WebSocketClientFactory( - self.rdfdbRoot.replace('http://', 'ws://') + 'syncedGraph', - # Don't know if this is required by spec, but - # cyclone.websocket breaks with no origin header. - origin='foo') - factory.protocol = cast(Any, lambda: WsClientProtocol(self)) - - rr = urllib.parse.urlparse(self.rdfdbRoot) - reactor.connectTCP(rr.hostname, rr.port, factory) - # WsClientProtocol sets our currentClient. Needs rewrite using agents. - - def resync(self): - """ - get the whole graph again from the server (e.g. we had a - conflict while applying a patch and want to return to the - truth). - - To avoid too much churn, we remember our old graph and diff it - against the replacement. This way, our callers only see the - corrections. - - Edits you make during a resync will surely be lost, so I - should just fail them. There should be a notification back to - UIs who want to show that we're doing a resync. - """ - log.info('resync') - if self.currentClient: - self.currentClient.dropConnection() - - def _resyncGraph(self, response): - log.warn("new graph in") - - if self.currentClient: - self.currentClient.dropConnection() - # diff against old entire graph - # broadcast that change - - def runDepsOnNewPatch(self, p): - # See AutoDepGraphApi - pass - - def patch(self, p: Patch) -> None: - """send this patch to the server and apply it to our local - graph and run handlers""" - - if not self.isConnected or self.currentClient is None: - log.warn("not currently connected- dropping patch") - return - - if p.isNoop(): - log.info("skipping no-op patch") - return - - # these could fail if we're out of sync. One approach: - # Rerequest the full state from the server, try the patch - # again after that, then give up. - debugKey = '[id=%s]' % (id(p) % 1000) - log.debug("\napply local patch %s %s", debugKey, p) - try: - self._applyPatchLocally(p) - except ValueError as e: - log.error(e) - self.resync() - return - log.debug('runDepsOnNewPatch') - self.runDepsOnNewPatch(p) - log.debug('sendPatch') - self.currentClient.sendPatch(p) - log.debug('patch is done %s', debugKey) - - def suggestPrefixes(self, ctx, prefixes): - """ - when writing files for this ctx, try to use these n3 - prefixes. async, not guaranteed to finish before any - particular file flush - """ - treq.post(self.rdfdbRoot + 'prefixes', json.dumps({'ctx': ctx, 'prefixes': prefixes}).encode('utf8')) - - def _applyPatchLocally(self, p: Patch): - # .. and disconnect on failure - patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True) - log.debug("graph now has %s statements" % len(self._graph)) - - def onPatchFromDb(self, p): - """ - central server has sent us a patch - """ - if log.isEnabledFor(logging.DEBUG): - if len(p.addQuads) > 50: - log.debug('server has sent us %s', p.shortSummary()) - else: - log.debug('server has sent us %s', p) - - self._applyPatchLocally(p) - try: - self.runDepsOnNewPatch(p) - except Exception: - # don't reflect this error back to the server; we did - # receive its patch correctly. However, we're in a bad - # state since some dependencies may not have rerun - traceback.print_exc() - log.warn("some graph dependencies may not have completely run") - - if self.initiallySynced: - self.initiallySynced.callback(None) - self.initiallySynced = None