changeset 94:8bc63c7b619b

mv syncedgraph to subdir
author drewp@bigasterisk.com
date Sat, 21 May 2022 22:15:14 -0700
parents f22723264489
children 270f39920b30
files rdfdb/autodepgraphapi.py rdfdb/currentstategraphapi.py rdfdb/currentstategraphapi_test.py rdfdb/grapheditapi.py rdfdb/grapheditapi_test.py rdfdb/mock_syncedgraph.py rdfdb/readonly_graph.py rdfdb/syncedgraph.py rdfdb/syncedgraph/README.md rdfdb/syncedgraph/__init__.py rdfdb/syncedgraph/autodepgraphapi.py rdfdb/syncedgraph/currentstategraphapi.py rdfdb/syncedgraph/currentstategraphapi_test.py rdfdb/syncedgraph/grapheditapi.py rdfdb/syncedgraph/grapheditapi_test.py rdfdb/syncedgraph/mock_syncedgraph.py rdfdb/syncedgraph/readonly_graph.py rdfdb/syncedgraph/syncedgraph.py rdfdb/syncedgraph/syncedgraph_base.py rdfdb/syncedgraph_base.py
diffstat 19 files changed, 800 insertions(+), 796 deletions(-) [+]
line wrap: on
line diff
--- a/rdfdb/autodepgraphapi.py	Sun May 15 15:52:01 2022 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,224 +0,0 @@
-import logging
-from typing import Callable, Dict, List, Set, Tuple
-from rdfdb.syncedgraph_base import SyncedGraphBase
-
-from rdflib import RDF, RDFS, URIRef
-
-from rdfdb.currentstategraphapi import contextsForStatementNoWildcards
-
-log = logging.getLogger('autodepgraphapi')
-
-
-class AutoDepGraphApi(SyncedGraphBase):
-    """
-    knockoutjs-inspired API for automatically building a dependency
-    tree while reading the graph. See addHandler().
-
-    This design is more aggressive than knockoutjs, since the obvious
-    query methods like value() all error if you're not in a watched
-    section of code. This is supposed to make it easier to notice
-    dependency mistakes, especially when porting old code to use
-    SyncedGraph.
-
-    This class is a mixin for SyncedGraph, separated here because
-    these methods work together
-    """
-
-    def __init__(self):
-        self._watchers = _GraphWatchers()
-        self.currentFuncs: List[Callable[[], None]] = []  # stack of addHandler callers
-
-    def addHandler(self, func: Callable[[], None]) -> None:
-        """
-        run this (idempotent) func, noting what graph values it
-        uses. Run it again in the future if there are changes to those
-        graph values. The func might use different values during that
-        future call, and those will be what we watch for next.
-        """
-
-        # if we saw this func before, we need to forget the old
-        # callbacks it wanted and replace with the new ones we see
-        # now.
-
-        # if one handler func calls another, does that break anything?
-        # maybe not?
-
-        # no plan for sparql queries yet. Hook into a lower layer that
-        # reveals all their statement fetches? Just make them always
-        # new? Cache their results, so if i make the query again and
-        # it gives the same result, I don't call the handler?
-
-        self.currentFuncs.append(func)
-        log.debug('graph.currentFuncs push %s', func)
-        try:
-            try:
-                func()
-            except:
-                import traceback
-                traceback.print_exc()
-                raise
-        finally:
-            self.currentFuncs.pop()
-            log.debug('graph.currentFuncs pop %s. stack now has %s', func, len(self.currentFuncs))
-
-    def runDepsOnNewPatch(self, p):
-        """
-        patch p just happened to the graph; call everyone back who
-        might care, and then notice what data they depend on now
-        """
-        for func in self._watchers.whoCares(p):
-            # todo: forget the old handlers for this func
-            log.debug('runDepsOnNewPatch calling watcher %s', p)
-            self.addHandler(func)
-
-    def _getCurrentFunc(self):
-        if not self.currentFuncs:
-            # this may become a warning later
-            raise ValueError("asked for graph data outside of a handler")
-
-        # we add the watcher to the deepest function, since that
-        # should be the cheapest way to update when this part of the
-        # data changes
-        return self.currentFuncs[-1]
-
-    # these just call through to triples() so it might be possible to
-    # watch just that one.
-
-    # if you get a bnode in your response, maybe the answer to
-    # dependency tracking is to say that you depended on the triple
-    # that got you that bnode, since it is likely to change to another
-    # bnode later. This won't work if the receiver stores bnodes
-    # between calls, but probably most of them don't do that (they
-    # work from a starting uri)
-
-    def value(self, subject=None, predicate=RDF.value, object=None, default=None, any=True):
-        if object is not None:
-            raise NotImplementedError()
-        func = self._getCurrentFunc()
-        self._watchers.addSubjPredWatcher(func, subject, predicate)
-        return self._graph.value(subject, predicate, object=object, default=default, any=any)
-
-    def objects(self, subject=None, predicate=None):
-        func = self._getCurrentFunc()
-        self._watchers.addSubjPredWatcher(func, subject, predicate)
-        return self._graph.objects(subject, predicate)
-
-    def label(self, uri):
-        return self.value(uri, RDFS.label)
-
-    def subjects(self, predicate=None, object=None):
-        func = self._getCurrentFunc()
-        self._watchers.addPredObjWatcher(func, predicate, object)
-        return self._graph.subjects(predicate, object)
-
-    def predicate_objects(self, subject):
-        func = self._getCurrentFunc()
-        self._watchers.addSubjectWatcher(func, subject)
-        return self._graph.predicate_objects(subject)
-
-    def items(self, listUri):
-        """generator. Having a chain of watchers on the results is not
-        well-tested yet"""
-        chain = set([listUri])
-        while listUri:
-            item = self.value(listUri, RDF.first)
-            if item:
-                yield item
-            listUri = self.value(listUri, RDF.rest)
-            if listUri in chain:
-                raise ValueError("List contains a recursive rdf:rest reference")
-            chain.add(listUri)
-
-    def contains(self, triple):
-        func = self._getCurrentFunc()
-        self._watchers.addTripleWatcher(func, triple)
-        return triple in self._graph
-
-    def contextsForStatement(self, triple):
-        """currently this needs to be in an addHandler section, but it
-        sets no watchers so it won't actually update if the statement
-        was added or dropped from contexts"""
-        # func = self._getCurrentFunc()
-        return contextsForStatementNoWildcards(self._graph, triple)
-
-    # i find myself wanting 'patch' (aka enter/leave) versions of these calls that tell
-    # you only what results have just appeared or disappeared. I think
-    # I'm going to be repeating that logic a lot. Maybe just for the
-    # subjects(RDF.type, t) call
-
-
-HandlerSet = Set[Callable[[], None]]
-
-
-class _GraphWatchers(object):
-    """
-    store the current handlers that care about graph changes
-    """
-
-    def __init__(self):
-        self._handlersSp: Dict[Tuple[URIRef, URIRef], HandlerSet] = {}  # (s,p): set(handlers)
-        self._handlersPo: Dict[Tuple[URIRef, URIRef], HandlerSet] = {}  # (p,o): set(handlers)
-        self._handlersSpo: Dict[Tuple[URIRef, URIRef, URIRef], HandlerSet] = {}  # (s,p,o): set(handlers)
-        self._handlersS: Dict[URIRef, HandlerSet] = {}  # s: set(handlers)
-
-    def addSubjPredWatcher(self, func, s, p):
-        if func is None:
-            return
-        key = s, p
-        try:
-            self._handlersSp.setdefault(key, set()).add(func)
-        except Exception:
-            log.error("with key %r and func %r" % (key, func))
-            raise
-
-    def addPredObjWatcher(self, func, p, o):
-        self._handlersPo.setdefault((p, o), set()).add(func)
-
-    def addTripleWatcher(self, func, triple):
-        self._handlersSpo.setdefault(triple, set()).add(func)
-
-    def addSubjectWatcher(self, func, s):
-        self._handlersS.setdefault(s, set()).add(func)
-
-    def whoCares(self, patch):
-        """what handler functions would care about the changes in this patch?
-
-        this removes the handlers that it gives you
-        """
-        # self.dependencies()
-        ret: Set[Callable[[], None]] = set()
-        affectedSubjPreds = set([(s, p) for s, p, o, c in patch.addQuads] + [(s, p) for s, p, o, c in patch.delQuads])
-        for (s, p), funcs in self._handlersSp.items():
-            if (s, p) in affectedSubjPreds:
-                ret.update(funcs)
-                funcs.clear()
-
-        affectedPredObjs = set([(p, o) for s, p, o, c in patch.addQuads] + [(p, o) for s, p, o, c in patch.delQuads])
-        for (p, o), funcs in self._handlersPo.items():
-            if (p, o) in affectedPredObjs:
-                ret.update(funcs)
-                funcs.clear()
-
-        affectedTriples = set([(s, p, o) for s, p, o, c in patch.addQuads] + [(s, p, o) for s, p, o, c in patch.delQuads])
-        for triple, funcs in self._handlersSpo.items():
-            if triple in affectedTriples:
-                ret.update(funcs)
-                funcs.clear()
-
-        affectedSubjs = set([s for s, p, o, c in patch.addQuads] + [s for s, p, o, c in patch.delQuads])
-        for subj, funcs in self._handlersS.items():
-            if subj in affectedSubjs:
-                ret.update(funcs)
-                funcs.clear()
-
-        return ret
-
-    def dependencies(self):
-        """
-        for debugging, make a list of all the active handlers and what
-        data they depend on. This is meant for showing on the web ui
-        for browsing.
-        """
-        log.info("whocares:")
-        from pprint import pprint
-        pprint(self._handlersSp)
--- a/rdfdb/currentstategraphapi.py	Sun May 15 15:52:01 2022 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,93 +0,0 @@
-import itertools
-import logging
-import time
-import traceback
-from typing import Optional, Set, Tuple
-
-from rdflib import ConjunctiveGraph, URIRef
-from rdflib.term import Node
-
-from rdfdb.rdflibpatch import contextsForStatement as rp_contextsForStatement
-from rdfdb.readonly_graph import ReadOnlyConjunctiveGraph
-from rdfdb.syncedgraph_base import SyncedGraphBase
-
-log = logging.getLogger("currentstate")
-
-TripleFilter = Tuple[Optional[Node], Optional[Node], Optional[Node]]
-
-
-class Mgr(object):
-
-    def __init__(self, graph, tripleFilter):
-        self._graph = graph
-        self._tripleFilter = tripleFilter
-
-    def __enter__(self):
-        # this should be a readonly view of the existing
-        # graph, maybe with something to guard against
-        # writes/patches happening while reads are being
-        # done. Typical usage will do some reads on this graph
-        # before moving on to writes.
-
-        if True:
-            g = ReadOnlyConjunctiveGraph(self._graph)
-        else:
-            t1 = time.time()
-            g = ConjunctiveGraph()
-            for s, p, o, c in self._graph.quads(self._tripleFilter):
-                g.store.add((s, p, o), c)
-
-            if self._tripleFilter == (None, None, None):
-                self.logThisCopy(g, time.time() - t1)
-
-        g.contextsForStatement = lambda stmt: contextsForStatementNoWildcards(g, stmt)
-        return g
-
-    def logThisCopy(self, g, sec):
-        log.info("copied graph %s statements (%.1f ms) " "because of this:" % (len(g), sec * 1000))
-        for frame in traceback.format_stack(limit=4)[:-2]:
-            for line in frame.splitlines():
-                log.info("  " + line)
-
-    def __exit__(self, type, val, tb):
-        return
-
-
-class CurrentStateGraphApi(SyncedGraphBase):
-    """
-    mixin for SyncedGraph, separated here because these methods work together
-    """
-
-    def currentState(self, context: Optional[URIRef] = None, tripleFilter: TripleFilter = (None, None, None)) -> Mgr:
-        """
-        a graph you can read without being in an addHandler
-
-        you can save some time by passing a triple filter, and we'll only give you the matching triples
-        """
-        if context is not None:
-            raise NotImplementedError("currentState with context arg")
-
-        return Mgr(self._graph, tripleFilter)
-
-    _reservedSequentials: Optional[Set[URIRef]] = None
-
-    def sequentialUri(self, prefix: URIRef) -> URIRef:
-        """
-        Prefix URIRef like http://example.com/r- will return
-        http://example.com/r-1 if that uri is not a subject in the graph,
-        or else http://example.com/r-2, etc
-        """
-        if self._reservedSequentials is None:
-            self._reservedSequentials = set()
-        for i in itertools.count(1):
-            newUri = URIRef(prefix + str(i))
-            if newUri not in self._reservedSequentials and not list(self._graph.triples((newUri, None, None))):
-                self._reservedSequentials.add(newUri)
-                return newUri
-        raise NotImplementedError("never reached")
-
-
-def contextsForStatementNoWildcards(g, triple):
-    if None in triple:
-        raise NotImplementedError("no wildcards")
-    return rp_contextsForStatement(g, triple)
--- a/rdfdb/currentstategraphapi_test.py	Sun May 15 15:52:01 2022 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,13 +0,0 @@
-import unittest
-
-from rdflib import URIRef
-
-from rdfdb.syncedgraph import SyncedGraph
-
-
-class TestSequentialUri(unittest.TestCase):
-
-    def test_returnsSequentialUris(self):
-        g = SyncedGraph(URIRef('http://example.com/db/'), label='test')
-        self.assertEqual(g.sequentialUri(URIRef('http://example.com/foo')), URIRef('http://example.com/foo1'))
-        self.assertEqual(g.sequentialUri(URIRef('http://example.com/foo')), URIRef('http://example.com/foo2'))
--- a/rdfdb/grapheditapi.py	Sun May 15 15:52:01 2022 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,104 +0,0 @@
-import logging
-import random
-from itertools import chain
-from rdfdb.currentstategraphapi import CurrentStateGraphApi
-from rdfdb.syncedgraph_base import SyncedGraphBase
-
-from rdflib import RDF, URIRef
-from rdflib.term import Node
-
-from rdfdb.patch import Patch, quadsWithContextUris
-
-log = logging.getLogger('graphedit')
-
-
-class GraphEditApi(CurrentStateGraphApi, SyncedGraphBase):
-    """
-    fancier graph edits
-
-    mixin for SyncedGraph, separated here because these methods work together
-    """
-
-    def getObjectPatch(self, context: URIRef, subject: Node, predicate: URIRef, newObject: Node) -> Patch:
-        """send a patch which removes existing values for (s,p,*,c)
-        and adds (s,p,newObject,c). Values in other graphs are not affected.
-
-        newObject can be None, which will remove all (subj,pred,*) statements.
-        """
-
-        existing = []
-        for spoc in quadsWithContextUris(self._graph.quads((subject, predicate, None, context))):
-            existing.append(spoc)
-        toAdd = ([(subject, predicate, newObject, context)] if newObject is not None else [])
-        return Patch(delQuads=existing, addQuads=toAdd).simplify()
-
-    def patchObject(self, context: URIRef, subject: Node, predicate: URIRef, newObject: Node):
-        p = self.getObjectPatch(context, subject, predicate, newObject)
-        if not p.isNoop():
-            log.debug("patchObject %r" % p.jsonRepr)
-        self.patch(p)  # type: ignore
-
-    def patchSubgraph(self, context, newGraph):
-        """
-        replace all statements in 'context' with the quads in newGraph.
-        This is not cooperating with currentState.
-        """
-        old = set(quadsWithContextUris(self._graph.quads((None, None, None, context))))
-        new = set(quadsWithContextUris(newGraph))
-        p = Patch(delQuads=old - new, addQuads=new - old)
-        self.patch(p)
-        return p  # for debugging
-
-    def patchMapping(self, context, subject, predicate, nodeClass, keyPred, valuePred, newKey, newValue):
-        """
-        creates/updates a structure like this:
-
-           ?subject ?predicate [
-             a ?nodeClass;
-             ?keyPred ?newKey;
-             ?valuePred ?newValue ] .
-
-        There should be a complementary readMapping that gets you a
-        value since that's tricky too
-        """
-
-        # as long as currentState is expensive and has the
-        # tripleFilter optimization, this looks like a mess. If
-        # currentState became cheap, a lot of code here could go away.
-
-        with self.currentState(tripleFilter=(subject, predicate, None)) as current:
-            adds = set([])
-            for setting in current.objects(subject, predicate):
-                with self.currentState(tripleFilter=(setting, keyPred, None)) as current2:
-
-                    match = current2.value(setting, keyPred) == newKey
-                if match:
-                    break
-            else:
-                setting = URIRef(subject + "/map/%s" % random.randrange(999999999))
-                adds.update([
-                    (subject, predicate, setting, context),
-                    (setting, RDF.type, nodeClass, context),
-                    (setting, keyPred, newKey, context),
-                ])
-
-        with self.currentState(tripleFilter=(setting, valuePred, None)) as current:
-            dels = set([])
-            for prev in current.objects(setting, valuePred):
-                dels.add((setting, valuePred, prev, context))
-            adds.add((setting, valuePred, newValue, context))
-
-            if adds != dels:
-                self.patch(Patch(delQuads=dels, addQuads=adds))
-
-    def removeMappingNode(self, context, node):
-        """
-        removes the statements with this node as subject or object, which
-        is the right amount of statements to remove a node that
-        patchMapping made.
-        """
-        p = Patch(delQuads=[
-            spo + (context,)
-            for spo in chain(self._graph.triples((None, None, node), context=context), self._graph.triples((node, None, None), context=context))
-        ])
-        self.patch(p)
--- a/rdfdb/grapheditapi_test.py	Sun May 15 15:52:01 2022 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,30 +0,0 @@
-import unittest
-
-from rdflib import ConjunctiveGraph, URIRef
-
-from rdfdb.grapheditapi import GraphEditApi
-
-
-class TestPatchSubgraph(unittest.TestCase):
-
-    def testCollapsesIdenticalQuads(self):
-        appliedPatches = []
-
-        class Obj(GraphEditApi):
-            def __init__(self):
-                pass
-
-            def patch(self, p):
-                appliedPatches.append(p)
-
-            _graph: ConjunctiveGraph
-
-        obj = Obj()
-        obj._graph = ConjunctiveGraph()
-        stmt1 = (URIRef('s'), URIRef('p'), URIRef('o'), URIRef('g'))
-        obj._graph.addN([stmt1])
-        obj.patchSubgraph(URIRef('g'), [stmt1])
-        self.assertEqual(len(appliedPatches), 1)
-        p = appliedPatches[0]
-        self.assertTrue(p.isNoop())
-        self.assertEqual(p.jsonRepr, '{"patch": {"adds": "", "deletes": ""}}')
--- a/rdfdb/mock_syncedgraph.py	Sun May 15 15:52:01 2022 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,49 +0,0 @@
-from rdflib import RDF, RDFS, Graph
-from rdflib.parser import StringInputSource
-
-
-class MockSyncedGraph(object):
-    """
-    Lets users of SyncedGraph mostly work. Doesn't yet help with any
-    testing of the rerun-upon-graph-change behavior.
-    """
-
-    def __init__(self, n3Content):
-        self._graph = Graph()
-        self._graph.parse(StringInputSource(n3Content), format='n3')
-
-    def addHandler(self, func):
-        func()
-
-    def value(self, subject=None, predicate=RDF.value, object=None, default=None, any=True):
-        if object is not None:
-            raise NotImplementedError()
-        return self._graph.value(subject, predicate, object=object, default=default, any=any)
-
-    def objects(self, subject=None, predicate=None):
-        return self._graph.objects(subject, predicate)
-
-    def label(self, uri):
-        return self.value(uri, RDFS.label)
-
-    def subjects(self, predicate=None, object=None):
-        return self._graph.subjects(predicate, object)
-
-    def predicate_objects(self, subject):
-        return self._graph.predicate_objects(subject)
-
-    def items(self, listUri):
-        """generator. Having a chain of watchers on the results is not
-        well-tested yet"""
-        chain = set([listUri])
-        while listUri:
-            item = self.value(listUri, RDF.first)
-            if item:
-                yield item
-            listUri = self.value(listUri, RDF.rest)
-            if listUri in chain:
-                raise ValueError("List contains a recursive rdf:rest reference")
-            chain.add(listUri)
-
-    def contains(self, triple):
-        return triple in self._graph
--- a/rdfdb/readonly_graph.py	Sun May 15 15:52:01 2022 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,17 +0,0 @@
-class ReadOnlyConjunctiveGraph(object):
-    """similar to rdflib's ReadOnlyGraphAggregate but takes one CJ in, instead
-    of a bunch of Graphs"""
-
-    def __init__(self, graph):
-        self.graph = graph
-
-    def __getattr__(self, attr):
-        if attr in ['subjects', 'value', 'objects', 'triples', 'label']:  # not complete
-            return getattr(self.graph, attr)
-        raise TypeError("can't access %r of read-only graph" % attr)
-
-    def __len__(self):
-        return len(self.graph)
-
-    def contextsForStatement(self, stmt):
-        raise NotImplementedError
--- a/rdfdb/syncedgraph.py	Sun May 15 15:52:01 2022 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,40 +0,0 @@
-"""
-client code uses a SyncedGraph, which has a few things:
-
-AutoDepGraphApi - knockoutjs-inspired API for querying the graph in a
-way that lets me call you again when there were changes to the things
-you queried
-
-CurrentStateGraphApi - a way to query the graph that doesn't gather
-your dependencies like AutoDepGraphApi does
-
-GraphEditApi - methods to write patches to the graph for common
-operations, e.g. replacing a value, or editing a mapping
-
-WsClientProtocol one connection with the rdfdb server.
-"""
-
-from typing import Optional
-from rdfdb.autodepgraphapi import AutoDepGraphApi
-from rdfdb.grapheditapi import GraphEditApi
-# everybody who writes literals needs to get this
-from rdfdb.rdflibpatch_literal import patch
-from rdfdb.syncedgraph_base import SyncedGraphBase
-from rdflib import URIRef
-patch()
-
-
-class SyncedGraph(AutoDepGraphApi, GraphEditApi):
-    '''
-    SyncedGraphBase
-       |       |
-    CurState AutoDep
-       |       |
-    GraphEdit  |
-          |    |
-       SyncedGraph
-    '''
-    def __init__(self, rdfdbRoot: URIRef, label: str, receiverHost: Optional[str] = None):
-        SyncedGraphBase.__init__(self, rdfdbRoot, label, receiverHost)
-        AutoDepGraphApi.__init__(self)
-
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rdfdb/syncedgraph/README.md	Sat May 21 22:15:14 2022 -0700
@@ -0,0 +1,1 @@
+client for rdfdb.service that sends/receives patches over a websocket connection
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rdfdb/syncedgraph/autodepgraphapi.py	Sat May 21 22:15:14 2022 -0700
@@ -0,0 +1,224 @@
+import logging
+from typing import Callable, Dict, List, Set, Tuple
+
+from rdfdb.syncedgraph.currentstategraphapi import \
+    contextsForStatementNoWildcards
+from rdfdb.syncedgraph.syncedgraph_base import SyncedGraphBase
+from rdflib import RDF, RDFS, URIRef
+
+log = logging.getLogger('autodepgraphapi')
+
+
+class AutoDepGraphApi(SyncedGraphBase):
+    """
+    knockoutjs-inspired API for automatically building a dependency
+    tree while reading the graph. See addHandler().
+
+    This design is more aggressive than knockoutjs, since the obvious
+    query methods like value() all error if you're not in a watched
+    section of code. This is supposed to make it easier to notice
+    dependency mistakes, especially when porting old code to use
+    SyncedGraph.
+
+    This class is a mixin for SyncedGraph, separated here because
+    these methods work together
+    """
+
+    def __init__(self):
+        self._watchers = _GraphWatchers()
+        self.currentFuncs: List[Callable[[], None]] = []  # stack of addHandler callers
+
+    def addHandler(self, func: Callable[[], None]) -> None:
+        """
+        run this (idempotent) func, noting what graph values it
+        uses. Run it again in the future if there are changes to those
+        graph values. The func might use different values during that
+        future call, and those will be what we watch for next.
+        """
+
+        # if we saw this func before, we need to forget the old
+        # callbacks it wanted and replace with the new ones we see
+        # now.
+
+        # if one handler func calls another, does that break anything?
+        # maybe not?
+
+        # no plan for sparql queries yet. Hook into a lower layer that
+        # reveals all their statement fetches? Just make them always
+        # new? Cache their results, so if i make the query again and
+        # it gives the same result, I don't call the handler?
+
+        self.currentFuncs.append(func)
+        log.debug('graph.currentFuncs push %s', func)
+        try:
+            try:
+                func()
+            except:
+                import traceback
+                traceback.print_exc()
+                raise
+        finally:
+            self.currentFuncs.pop()
+            log.debug('graph.currentFuncs pop %s. stack now has %s', func, len(self.currentFuncs))
+
+    def runDepsOnNewPatch(self, p):
+        """
+        patch p just happened to the graph; call everyone back who
+        might care, and then notice what data they depend on now
+        """
+        for func in self._watchers.whoCares(p):
+            # todo: forget the old handlers for this func
+            log.debug('runDepsOnNewPatch calling watcher %s', p)
+            self.addHandler(func)
+
+    def _getCurrentFunc(self):
+        if not self.currentFuncs:
+            # this may become a warning later
+            raise ValueError("asked for graph data outside of a handler")
+
+        # we add the watcher to the deepest function, since that
+        # should be the cheapest way to update when this part of the
+        # data changes
+        return self.currentFuncs[-1]
+
+    # these just call through to triples() so it might be possible to
+    # watch just that one.
+
+    # if you get a bnode in your response, maybe the answer to
+    # dependency tracking is to say that you depended on the triple
+    # that got you that bnode, since it is likely to change to another
+    # bnode later. This won't work if the receiver stores bnodes
+    # between calls, but probably most of them don't do that (they
+    # work from a starting uri)
+
+    def value(self, subject=None, predicate=RDF.value, object=None, default=None, any=True):
+        if object is not None:
+            raise NotImplementedError()
+        func = self._getCurrentFunc()
+        self._watchers.addSubjPredWatcher(func, subject, predicate)
+        return self._graph.value(subject, predicate, object=object, default=default, any=any)
+
+    def objects(self, subject=None, predicate=None):
+        func = self._getCurrentFunc()
+        self._watchers.addSubjPredWatcher(func, subject, predicate)
+        return self._graph.objects(subject, predicate)
+
+    def label(self, uri):
+        return self.value(uri, RDFS.label)
+
+    def subjects(self, predicate=None, object=None):
+        func = self._getCurrentFunc()
+        self._watchers.addPredObjWatcher(func, predicate, object)
+        return self._graph.subjects(predicate, object)
+
+    def predicate_objects(self, subject):
+        func = self._getCurrentFunc()
+        self._watchers.addSubjectWatcher(func, subject)
+        return self._graph.predicate_objects(subject)
+
+    def items(self, listUri):
+        """generator. Having a chain of watchers on the results is not
+        well-tested yet"""
+        chain = set([listUri])
+        while listUri:
+            item = self.value(listUri, RDF.first)
+            if item:
+                yield item
+            listUri = self.value(listUri, RDF.rest)
+            if listUri in chain:
+                raise ValueError("List contains a recursive rdf:rest reference")
+            chain.add(listUri)
+
+    def contains(self, triple):
+        func = self._getCurrentFunc()
+        self._watchers.addTripleWatcher(func, triple)
+        return triple in self._graph
+
+    def contextsForStatement(self, triple):
+        """currently this needs to be in an addHandler section, but it
+        sets no watchers so it won't actually update if the statement
+        was added or dropped from contexts"""
+        # func = self._getCurrentFunc()
+        return contextsForStatementNoWildcards(self._graph, triple)
+
+    # i find myself wanting 'patch' (aka enter/leave) versions of these calls that tell
+    # you only what results have just appeared or disappeared. I think
+    # I'm going to be repeating that logic a lot. Maybe just for the
+    # subjects(RDF.type, t) call
+
+
+HandlerSet = Set[Callable[[], None]]
+
+
+class _GraphWatchers(object):
+    """
+    store the current handlers that care about graph changes
+    """
+
+    def __init__(self):
+        self._handlersSp: Dict[Tuple[URIRef, URIRef], HandlerSet] = {}  # (s,p): set(handlers)
+        self._handlersPo: Dict[Tuple[URIRef, URIRef], HandlerSet] = {}  # (p,o): set(handlers)
+        self._handlersSpo: Dict[Tuple[URIRef, URIRef, URIRef], HandlerSet] = {}  # (s,p,o): set(handlers)
+        self._handlersS: Dict[URIRef, HandlerSet] = {}  # s: set(handlers)
+
+    def addSubjPredWatcher(self, func, s, p):
+        if func is None:
+            return
+        key = s, p
+        try:
+            self._handlersSp.setdefault(key, set()).add(func)
+        except Exception:
+            log.error("with key %r and func %r" % (key, func))
+            raise
+
+    def addPredObjWatcher(self, func, p, o):
+        self._handlersPo.setdefault((p, o), set()).add(func)
+
+    def addTripleWatcher(self, func, triple):
+        self._handlersSpo.setdefault(triple, set()).add(func)
+
+    def addSubjectWatcher(self, func, s):
+        self._handlersS.setdefault(s, set()).add(func)
+
+    def whoCares(self, patch):
+        """what handler functions would care about the changes in this patch?
+
+        this removes the handlers that it gives you
+        """
+        # self.dependencies()
+        ret: Set[Callable[[], None]] = set()
+        affectedSubjPreds = set([(s, p) for s, p, o, c in patch.addQuads] + [(s, p) for s, p, o, c in patch.delQuads])
+        for (s, p), funcs in self._handlersSp.items():
+            if (s, p) in affectedSubjPreds:
+                ret.update(funcs)
+                funcs.clear()
+
+        affectedPredObjs = set([(p, o) for s, p, o, c in patch.addQuads] + [(p, o) for s, p, o, c in patch.delQuads])
+        for (p, o), funcs in self._handlersPo.items():
+            if (p, o) in affectedPredObjs:
+                ret.update(funcs)
+                funcs.clear()
+
+        affectedTriples = set([(s, p, o) for s, p, o, c in patch.addQuads] + [(s, p, o) for s, p, o, c in patch.delQuads])
+        for triple, funcs in self._handlersSpo.items():
+            if triple in affectedTriples:
+                ret.update(funcs)
+                funcs.clear()
+
+        affectedSubjs = set([s for s, p, o, c in patch.addQuads] + [s for s, p, o, c in patch.delQuads])
+        for subj, funcs in self._handlersS.items():
+            if subj in affectedSubjs:
+                ret.update(funcs)
+                funcs.clear()
+
+        return ret
+
+    def dependencies(self):
+        """
+        for debugging, make a list of all the active handlers and what
+        data they depend on. This is meant for showing on the web ui
+        for browsing.
+        """
+        log.info("whocares:")
+        from pprint import pprint
+        pprint(self._handlersSp)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rdfdb/syncedgraph/currentstategraphapi.py	Sat May 21 22:15:14 2022 -0700
@@ -0,0 +1,92 @@
+import itertools
+import logging
+import time
+import traceback
+from typing import Optional, Set, Tuple
+
+from rdfdb.rdflibpatch import contextsForStatement as rp_contextsForStatement
+from rdfdb.readonly_graph import ReadOnlyConjunctiveGraph
+from rdfdb.syncedgraph.syncedgraph_base import SyncedGraphBase
+from rdflib import ConjunctiveGraph, URIRef
+from rdflib.term import Node
+
+log = logging.getLogger("currentstate")
+
+TripleFilter = Tuple[Optional[Node], Optional[Node], Optional[Node]]
+
+
+class Mgr(object):
+
+    def __init__(self, graph, tripleFilter):
+        self._graph = graph
+        self._tripleFilter = tripleFilter
+
+    def __enter__(self):
+        # this should be a readonly view of the existing
+        # graph, maybe with something to guard against
+        # writes/patches happening while reads are being
+        # done. Typical usage will do some reads on this graph
+        # before moving on to writes.
+
+        if True:
+            g = ReadOnlyConjunctiveGraph(self._graph)
+        else:
+            t1 = time.time()
+            g = ConjunctiveGraph()
+            for s, p, o, c in self._graph.quads(self._tripleFilter):
+                g.store.add((s, p, o), c)
+
+            if self._tripleFilter == (None, None, None):
+                self.logThisCopy(g, time.time() - t1)
+
+        g.contextsForStatement = lambda stmt: contextsForStatementNoWildcards(g, stmt)
+        return g
+
+    def logThisCopy(self, g, sec):
+        log.info("copied graph %s statements (%.1f ms) " "because of this:" % (len(g), sec * 1000))
+        for frame in traceback.format_stack(limit=4)[:-2]:
+            for line in frame.splitlines():
+                log.info("  " + line)
+
+    def __exit__(self, type, val, tb):
+        return
+
+
+class CurrentStateGraphApi(SyncedGraphBase):
+    """
+    mixin for SyncedGraph, separated here because these methods work together
+    """
+
+    def currentState(self, context: Optional[URIRef] = None, tripleFilter: TripleFilter = (None, None, None)) -> Mgr:
+        """
+        a graph you can read without being in an addHandler
+
+        you can save some time by passing a triple filter, and we'll only give you the matching triples
+        """
+        if context is not None:
+            raise NotImplementedError("currentState with context arg")
+
+        return Mgr(self._graph, tripleFilter)
+
+    _reservedSequentials: Optional[Set[URIRef]] = None
+
+    def sequentialUri(self, prefix: URIRef) -> URIRef:
+        """
+        Prefix URIRef like http://example.com/r- will return
+        http://example.com/r-1 if that uri is not a subject in the graph,
+        or else http://example.com/r-2, etc
+        """
+        if self._reservedSequentials is None:
+            self._reservedSequentials = set()
+        for i in itertools.count(1):
+            newUri = URIRef(prefix + str(i))
+            if newUri not in self._reservedSequentials and not list(self._graph.triples((newUri, None, None))):
+                self._reservedSequentials.add(newUri)
+                return newUri
+        raise NotImplementedError("never reached")
+
+
+def contextsForStatementNoWildcards(g, triple):
+    if None in triple:
+        raise NotImplementedError("no wildcards")
+    return rp_contextsForStatement(g, triple)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rdfdb/syncedgraph/currentstategraphapi_test.py	Sat May 21 22:15:14 2022 -0700
@@ -0,0 +1,12 @@
+import unittest
+
+from rdfdb.syncedgraph.syncedgraph import SyncedGraph
+from rdflib import URIRef
+
+
+class TestSequentialUri(unittest.TestCase):
+
+    def test_returnsSequentialUris(self):
+        g = SyncedGraph(URIRef('http://example.com/db/'), label='test')
+        self.assertEqual(g.sequentialUri(URIRef('http://example.com/foo')), URIRef('http://example.com/foo1'))
+        self.assertEqual(g.sequentialUri(URIRef('http://example.com/foo')), URIRef('http://example.com/foo2'))
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rdfdb/syncedgraph/grapheditapi.py	Sat May 21 22:15:14 2022 -0700
@@ -0,0 +1,103 @@
+import logging
+import random
+from itertools import chain
+
+from rdfdb.patch import Patch, quadsWithContextUris
+from rdfdb.syncedgraph.currentstategraphapi import CurrentStateGraphApi
+from rdfdb.syncedgraph.syncedgraph_base import SyncedGraphBase
+from rdflib import RDF, URIRef
+from rdflib.term import Node
+
+log = logging.getLogger('graphedit')
+
+
+class GraphEditApi(CurrentStateGraphApi, SyncedGraphBase):
+    """
+    fancier graph edits
+
+    mixin for SyncedGraph, separated here because these methods work together
+    """
+
+    def getObjectPatch(self, context: URIRef, subject: Node, predicate: URIRef, newObject: Node) -> Patch:
+        """send a patch which removes existing values for (s,p,*,c)
+        and adds (s,p,newObject,c). Values in other graphs are not affected.
+
+        newObject can be None, which will remove all (subj,pred,*) statements.
+        """
+
+        existing = []
+        for spoc in quadsWithContextUris(self._graph.quads((subject, predicate, None, context))):
+            existing.append(spoc)
+        toAdd = ([(subject, predicate, newObject, context)] if newObject is not None else [])
+        return Patch(delQuads=existing, addQuads=toAdd).simplify()
+
+    def patchObject(self, context: URIRef, subject: Node, predicate: URIRef, newObject: Node):
+        p = self.getObjectPatch(context, subject, predicate, newObject)
+        if not p.isNoop():
+            log.debug("patchObject %r" % p.jsonRepr)
+        self.patch(p)  # type: ignore
+
+    def patchSubgraph(self, context, newGraph):
+        """
+        replace all statements in 'context' with the quads in newGraph.
+        This is not cooperating with currentState.
+        """
+        old = set(quadsWithContextUris(self._graph.quads((None, None, None, context))))
+        new = set(quadsWithContextUris(newGraph))
+        p = Patch(delQuads=old - new, addQuads=new - old)
+        self.patch(p)
+        return p  # for debugging
+
+    def patchMapping(self, context, subject, predicate, nodeClass, keyPred, valuePred, newKey, newValue):
+        """
+        creates/updates a structure like this:
+
+           ?subject ?predicate [
+             a ?nodeClass;
+             ?keyPred ?newKey;
+             ?valuePred ?newValue ] .
+
+        There should be a complementary readMapping that gets you a
+        value since that's tricky too
+        """
+
+        # as long as currentState is expensive and has the
+        # tripleFilter optimization, this looks like a mess. If
+        # currentState became cheap, a lot of code here could go away.
+
+        with self.currentState(tripleFilter=(subject, predicate, None)) as current:
+            adds = set([])
+            for setting in current.objects(subject, predicate):
+                with self.currentState(tripleFilter=(setting, keyPred, None)) as current2:
+
+                    match = current2.value(setting, keyPred) == newKey
+                if match:
+                    break
+            else:
+                setting = URIRef(subject + "/map/%s" % random.randrange(999999999))
+                adds.update([
+                    (subject, predicate, setting, context),
+                    (setting, RDF.type, nodeClass, context),
+                    (setting, keyPred, newKey, context),
+                ])
+
+        with self.currentState(tripleFilter=(setting, valuePred, None)) as current:
+            dels = set([])
+            for prev in current.objects(setting, valuePred):
+                dels.add((setting, valuePred, prev, context))
+            adds.add((setting, valuePred, newValue, context))
+
+            if adds != dels:
+                self.patch(Patch(delQuads=dels, addQuads=adds))
+
+    def removeMappingNode(self, context, node):
+        """
+        removes the statements with this node as subject or object, which
+        is the right amount of statements to remove a node that
+        patchMapping made.
+        """
+        p = Patch(delQuads=[
+            spo + (context,)
+            for spo in chain(self._graph.triples((None, None, node), context=context), self._graph.triples((node, None, None), context=context))
+        ])
+        self.patch(p)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rdfdb/syncedgraph/grapheditapi_test.py	Sat May 21 22:15:14 2022 -0700
@@ -0,0 +1,30 @@
+import unittest
+
+from rdfdb.syncedgraph.grapheditapi import GraphEditApi
+from rdflib import ConjunctiveGraph, URIRef
+
+
+class TestPatchSubgraph(unittest.TestCase):
+
+    def testCollapsesIdenticalQuads(self):
+        appliedPatches = []
+
+        class Obj(GraphEditApi):
+
+            def __init__(self):
+                pass
+
+            def patch(self, p):
+                appliedPatches.append(p)
+
+            _graph: ConjunctiveGraph
+
+        obj = Obj()
+        obj._graph = ConjunctiveGraph()
+        stmt1 = (URIRef('s'), URIRef('p'), URIRef('o'), URIRef('g'))
+        obj._graph.addN([stmt1])
+        obj.patchSubgraph(URIRef('g'), [stmt1])
+        self.assertEqual(len(appliedPatches), 1)
+        p = appliedPatches[0]
+        self.assertTrue(p.isNoop())
+        self.assertEqual(p.jsonRepr, '{"patch": {"adds": "", "deletes": ""}}')
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rdfdb/syncedgraph/mock_syncedgraph.py	Sat May 21 22:15:14 2022 -0700
@@ -0,0 +1,49 @@
+from rdflib import RDF, RDFS, Graph
+from rdflib.parser import StringInputSource
+
+
+class MockSyncedGraph(object):
+    """
+    Lets users of SyncedGraph mostly work. Doesn't yet help with any
+    testing of the rerun-upon-graph-change behavior.
+    """
+
+    def __init__(self, n3Content):
+        self._graph = Graph()
+        self._graph.parse(StringInputSource(n3Content), format='n3')
+
+    def addHandler(self, func):
+        func()
+
+    def value(self, subject=None, predicate=RDF.value, object=None, default=None, any=True):
+        if object is not None:
+            raise NotImplementedError()
+        return self._graph.value(subject, predicate, object=object, default=default, any=any)
+
+    def objects(self, subject=None, predicate=None):
+        return self._graph.objects(subject, predicate)
+
+    def label(self, uri):
+        return self.value(uri, RDFS.label)
+
+    def subjects(self, predicate=None, object=None):
+        return self._graph.subjects(predicate, object)
+
+    def predicate_objects(self, subject):
+        return self._graph.predicate_objects(subject)
+
+    def items(self, listUri):
+        """generator. Having a chain of watchers on the results is not
+        well-tested yet"""
+        chain = set([listUri])
+        while listUri:
+            item = self.value(listUri, RDF.first)
+            if item:
+                yield item
+            listUri = self.value(listUri, RDF.rest)
+            if listUri in chain:
+                raise ValueError("List contains a recursive rdf:rest reference")
+            chain.add(listUri)
+
+    def contains(self, triple):
+        return triple in self._graph
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rdfdb/syncedgraph/readonly_graph.py	Sat May 21 22:15:14 2022 -0700
@@ -0,0 +1,17 @@
+class ReadOnlyConjunctiveGraph(object):
+    """similar to rdflib's ReadOnlyGraphAggregate but takes one CJ in, instead
+    of a bunch of Graphs"""
+
+    def __init__(self, graph):
+        self.graph = graph
+
+    def __getattr__(self, attr):
+        if attr in ['subjects', 'value', 'objects', 'triples', 'label']:  # not complete
+            return getattr(self.graph, attr)
+        raise TypeError("can't access %r of read-only graph" % attr)
+
+    def __len__(self):
+        return len(self.graph)
+
+    def contextsForStatement(self, stmt):
+        raise NotImplementedError
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rdfdb/syncedgraph/syncedgraph.py	Sat May 21 22:15:14 2022 -0700
@@ -0,0 +1,42 @@
+"""
+client code uses a SyncedGraph, which has a few things:
+
+AutoDepGraphApi - knockoutjs-inspired API for querying the graph in a
+way that lets me call you again when there were changes to the things
+you queried
+
+CurrentStateGraphApi - a way to query the graph that doesn't gather
+your dependencies like AutoDepGraphApi does
+
+GraphEditApi - methods to write patches to the graph for common
+operations, e.g. replacing a value, or editing a mapping
+
+WsClientProtocol one connection with the rdfdb server.
+"""
+
+from typing import Optional
+
+# everybody who writes literals needs to get this
+from rdfdb.rdflibpatch_literal import patch
+from rdfdb.syncedgraph.autodepgraphapi import AutoDepGraphApi
+from rdfdb.syncedgraph.grapheditapi import GraphEditApi
+from rdfdb.syncedgraph.syncedgraph_base import SyncedGraphBase
+from rdflib import URIRef
+
+patch()
+
+
+class SyncedGraph(AutoDepGraphApi, GraphEditApi):
+    '''
+    SyncedGraphBase
+       |       |
+    CurState AutoDep
+       |       |
+    GraphEdit  |
+          |    |
+       SyncedGraph
+    '''
+
+    def __init__(self, rdfdbRoot: URIRef, label: str, receiverHost: Optional[str] = None):
+        SyncedGraphBase.__init__(self, rdfdbRoot, label, receiverHost)
+        AutoDepGraphApi.__init__(self)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rdfdb/syncedgraph/syncedgraph_base.py	Sat May 21 22:15:14 2022 -0700
@@ -0,0 +1,230 @@
+import asyncio
+import json
+import logging
+import traceback
+from typing import Any, Optional, cast
+
+import aiohttp
+from rdfdb.patch import Patch
+from rdfdb.rdflibpatch import patchQuads
+from rdflib import ConjunctiveGraph, URIRef
+
+log = logging.getLogger('syncedgraph')
+
+# class WsClientProtocol(autobahn.twisted.websocket.WebSocketClientProtocol):
+#     """The server for this is service.WebsocketClient"""
+
+#     def __init__(self, sg):
+#         super().__init__()
+#         self.sg = sg
+#         self.sg.currentClient = self
+#         self.connectionId = None
+
+#     def onConnect(self, response):
+#         log.info('conn %r', response)
+
+#     def onOpen(self):
+#         log.info('ws open')
+#         self.sg.isConnected = True
+
+#     def onMessage(self, payload, isBinary):
+#         msg = json.loads(payload)
+#         if 'connectedAs' in msg:
+#             self.connectionId = msg['connectedAs']
+#             log.info(f'rdfdb calls us {self.connectionId}')
+#         elif 'patch' in msg:
+#             p = Patch(jsonRepr=payload.decode('utf8'))
+#             log.debug("received patch %s", p.shortSummary())
+#             self.sg.onPatchFromDb(p)
+#         else:
+#             log.warn('unknown msg from websocket: %s...', payload[:32])
+
+#     def sendPatch(self, p: Patch):
+#         # this is where we could concatenate little patches into a
+#         # bigger one. Often, many statements will cancel each
+#         # other out.
+
+#         # also who's going to accumulate patches when server is down,
+#         # or is that not allowed?
+#         if self.connectionId is None:
+#             raise ValueError("can't send patches before we get an id")
+#         body = p.makeJsonRepr()
+#         log.debug(f'connectionId={self.connectionId} sending patch {len(body)} bytes')
+#         self.sendMessage(body.encode('utf8'))
+
+#     def onClose(self, wasClean, code, reason):
+#         log.info("WebSocket connection closed: {0}".format(reason))
+#         self.sg.lostRdfdbConnection()
+
+# reactor = cast(IReactorCore, twisted.internet.reactor)
+
+
+class SyncedGraphBase(object):
+    """
+    graph for clients to use. Changes are synced with the master graph
+    in the rdfdb process.
+
+    self.patch(p: Patch) is the only way to write to the graph.
+
+    Reading can be done with the AutoDepGraphApi methods which set up
+    watchers to call you back when the results of the read have
+    changed (like knockoutjs). Or you can read with
+    CurrentStateGraphApi which doesn't have watchers, but you have to
+    opt into using it so it's clear you aren't in an auto-dep context
+    and meant to set up watchers.
+
+    You may want to attach to self.initiallySynced deferred so you
+    don't attempt patches before we've heard the initial contents of
+    the graph. It would be ok to accumulate some patches of new
+    material, but usually you won't correctly remove the existing
+    statements unless we have the correct graph.
+
+    If we get out of sync, we abandon our local graph (even any
+    pending local changes) and get the data again from the server.
+    """
+
+    def __init__(self, rdfdbRoot: URIRef, label: str, receiverHost: Optional[str] = None):
+        """
+        label is a string that the server will display in association
+        with your connection
+
+        receiverHost is the hostname other nodes can use to talk to me
+        """
+        self.rdfdbRoot = rdfdbRoot
+        self.httpSession = aiohttp.ClientSession()
+        self._senderTask = asyncio.create_task(self._sender())
+
+        self._initiallySynced = asyncio.Future()
+        self._graph = ConjunctiveGraph()
+
+        # todo:
+        # AutoDepGraphApi.__init__(self)
+
+        # this needs more state to track if we're doing a resync (and
+        # everything has to error or wait) or if we're live
+
+    async def _sender(self):
+        while True:
+            with self.httpSession.ws_connect(self.rdfdbRoot.replace('http://', 'ws://') + 'syncedGraph') as ws:
+                async for msg in ws:
+                    log.info(f"server sent us {msg=}")
+                    # if msg.type == aiohttp.WSMsgType.TEXT:
+                    #     if msg.data == 'close cmd':
+                    #         await ws.close()
+                    #         break
+                    #     else:
+                    #         await ws.send_str(msg.data + '/answer')
+                    # elif msg.type == aiohttp.WSMsgType.ERROR:
+                    #     break
+            self.lostRdfdbConnection()
+            log.info("lost connection- retry")
+            await asyncio.sleep(4)
+
+    async def init(self):
+        """return when we have the initial graph from server.
+
+        maybe this isn't really needed, as everything ought to be resilent
+        to the intial graph pouring in.
+        """
+        await self._initiallySynced
+
+    def lostRdfdbConnection(self) -> None:
+        self.isConnected = False
+        self.patch(Patch(delQuads=self._graph.quads()))
+        log.info(f'cleared graph to {len(self._graph)}')
+        log.error('graph is not updating- you need to restart')
+
+    def resync(self):
+        """
+        get the whole graph again from the server (e.g. we had a
+        conflict while applying a patch and want to return to the
+        truth).
+
+        To avoid too much churn, we remember our old graph and diff it
+        against the replacement. This way, our callers only see the
+        corrections.
+
+        Edits you make during a resync will surely be lost, so I
+        should just fail them. There should be a notification back to
+        UIs who want to show that we're doing a resync.
+        """
+        log.info('resync')
+        if self.currentClient:
+            self.currentClient.dropConnection()
+
+    def _resyncGraph(self, response):
+        log.warn("new graph in")
+
+        if self.currentClient:
+            self.currentClient.dropConnection()
+        # diff against old entire graph
+        # broadcast that change
+
+    def runDepsOnNewPatch(self, p):
+        # See AutoDepGraphApi
+        pass
+
+    def patch(self, p: Patch) -> None:
+        """send this patch to the server and apply it to our local
+        graph and run handlers"""
+
+        if not self.isConnected or self.currentClient is None:
+            log.warn("not currently connected- dropping patch")
+            return
+
+        if p.isNoop():
+            log.info("skipping no-op patch")
+            return
+
+        # these could fail if we're out of sync. One approach:
+        # Rerequest the full state from the server, try the patch
+        # again after that, then give up.
+        debugKey = '[id=%s]' % (id(p) % 1000)
+        log.debug("\napply local patch %s %s", debugKey, p)
+        try:
+            self._applyPatchLocally(p)
+        except ValueError as e:
+            log.error(e)
+            self.resync()
+            return
+        log.debug('runDepsOnNewPatch')
+        self.runDepsOnNewPatch(p)
+        log.debug('sendPatch')
+        self.currentClient.sendPatch(p)
+        log.debug('patch is done %s', debugKey)
+
+    async def suggestPrefixes(self, ctx, prefixes):
+        """
+        when writing files for this ctx, try to use these n3
+        prefixes. async, not guaranteed to finish before any
+        particular file flush
+        """
+        await self.httpSession.post(self.rdfdbRoot + 'prefixes', data=json.dumps({'ctx': ctx, 'prefixes': prefixes}))
+
+    def _applyPatchLocally(self, p: Patch):
+        # .. and disconnect on failure
+        patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
+        log.debug("graph now has %s statements" % len(self._graph))
+
+    def onPatchFromDb(self, p):
+        """
+        central server has sent us a patch
+        """
+        if log.isEnabledFor(logging.DEBUG):
+            if len(p.addQuads) > 50:
+                log.debug('server has sent us %s', p.shortSummary())
+            else:
+                log.debug('server has sent us %s', p)
+
+        self._applyPatchLocally(p)
+        try:
+            self.runDepsOnNewPatch(p)
+        except Exception:
+            # don't reflect this error back to the server; we did
+            # receive its patch correctly. However, we're in a bad
+            # state since some dependencies may not have rerun
+            traceback.print_exc()
+            log.warn("some graph dependencies may not have completely run")
+
+        if not self._initiallySynced.done():
+            self._initiallySynced.set_result(None)
--- a/rdfdb/syncedgraph_base.py	Sun May 15 15:52:01 2022 -0700
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,226 +0,0 @@
-import json
-import logging
-import traceback
-import urllib.parse
-from typing import Any, Optional, cast
-
-import autobahn.twisted.websocket
-import treq
-import twisted.internet.reactor
-from rdflib import ConjunctiveGraph, URIRef
-from twisted.internet import defer
-from twisted.internet.interfaces import IReactorCore
-
-from rdfdb.patch import Patch
-from rdfdb.rdflibpatch import patchQuads
-
-
-class WsClientProtocol(autobahn.twisted.websocket.WebSocketClientProtocol):
-    """The server for this is service.WebsocketClient"""
-
-    def __init__(self, sg):
-        super().__init__()
-        self.sg = sg
-        self.sg.currentClient = self
-        self.connectionId = None
-
-    def onConnect(self, response):
-        log.info('conn %r', response)
-
-    def onOpen(self):
-        log.info('ws open')
-        self.sg.isConnected = True
-
-    def onMessage(self, payload, isBinary):
-        msg = json.loads(payload)
-        if 'connectedAs' in msg:
-            self.connectionId = msg['connectedAs']
-            log.info(f'rdfdb calls us {self.connectionId}')
-        elif 'patch' in msg:
-            p = Patch(jsonRepr=payload.decode('utf8'))
-            log.debug("received patch %s", p.shortSummary())
-            self.sg.onPatchFromDb(p)
-        else:
-            log.warn('unknown msg from websocket: %s...', payload[:32])
-
-    def sendPatch(self, p: Patch):
-        # this is where we could concatenate little patches into a
-        # bigger one. Often, many statements will cancel each
-        # other out.
-
-        # also who's going to accumulate patches when server is down,
-        # or is that not allowed?
-        if self.connectionId is None:
-            raise ValueError("can't send patches before we get an id")
-        body = p.makeJsonRepr()
-        log.debug(f'connectionId={self.connectionId} sending patch {len(body)} bytes')
-        self.sendMessage(body.encode('utf8'))
-
-    def onClose(self, wasClean, code, reason):
-        log.info("WebSocket connection closed: {0}".format(reason))
-        self.sg.lostRdfdbConnection()
-
-
-reactor = cast(IReactorCore, twisted.internet.reactor)
-
-log = logging.getLogger('syncedgraph')
-
-
-class SyncedGraphBase(object):
-    """
-    graph for clients to use. Changes are synced with the master graph
-    in the rdfdb process.
-
-    self.patch(p: Patch) is the only way to write to the graph.
-
-    Reading can be done with the AutoDepGraphApi methods which set up
-    watchers to call you back when the results of the read have
-    changed (like knockoutjs). Or you can read with
-    CurrentStateGraphApi which doesn't have watchers, but you have to
-    opt into using it so it's clear you aren't in an auto-dep context
-    and meant to set up watchers.
-
-    You may want to attach to self.initiallySynced deferred so you
-    don't attempt patches before we've heard the initial contents of
-    the graph. It would be ok to accumulate some patches of new
-    material, but usually you won't correctly remove the existing
-    statements unless we have the correct graph.
-
-    If we get out of sync, we abandon our local graph (even any
-    pending local changes) and get the data again from the server.
-    """
-
-    def __init__(self, rdfdbRoot: URIRef, label: str, receiverHost: Optional[str] = None):
-        """
-        label is a string that the server will display in association
-        with your connection
-
-        receiverHost is the hostname other nodes can use to talk to me
-        """
-        self.isConnected = False
-        self.currentClient: Optional[WsClientProtocol] = None
-        self.rdfdbRoot = rdfdbRoot
-        self.connectSocket()
-        self.initiallySynced: defer.Deferred[None] = defer.Deferred()
-        self._graph = ConjunctiveGraph()
-
-        # todo:
-        # AutoDepGraphApi.__init__(self)
-
-        # this needs more state to track if we're doing a resync (and
-        # everything has to error or wait) or if we're live
-
-    def lostRdfdbConnection(self) -> None:
-        self.isConnected = False
-        self.patch(Patch(delQuads=self._graph.quads()))
-        log.info(f'cleared graph to {len(self._graph)}')
-        log.error('graph is not updating- you need to restart')
-        self.connectSocket()
-
-    def connectSocket(self) -> None:
-        factory = autobahn.twisted.websocket.WebSocketClientFactory(
-            self.rdfdbRoot.replace('http://', 'ws://') + 'syncedGraph',
-            # Don't know if this is required by spec, but
-            # cyclone.websocket breaks with no origin header.
-            origin='foo')
-        factory.protocol = cast(Any, lambda: WsClientProtocol(self))
-
-        rr = urllib.parse.urlparse(self.rdfdbRoot)
-        reactor.connectTCP(rr.hostname, rr.port, factory)
-        # WsClientProtocol sets our currentClient. Needs rewrite using agents.
-
-    def resync(self):
-        """
-        get the whole graph again from the server (e.g. we had a
-        conflict while applying a patch and want to return to the
-        truth).
-
-        To avoid too much churn, we remember our old graph and diff it
-        against the replacement. This way, our callers only see the
-        corrections.
-
-        Edits you make during a resync will surely be lost, so I
-        should just fail them. There should be a notification back to
-        UIs who want to show that we're doing a resync.
-        """
-        log.info('resync')
-        if self.currentClient:
-            self.currentClient.dropConnection()
-
-    def _resyncGraph(self, response):
-        log.warn("new graph in")
-
-        if self.currentClient:
-            self.currentClient.dropConnection()
-        # diff against old entire graph
-        # broadcast that change
-
-    def runDepsOnNewPatch(self, p):
-        # See AutoDepGraphApi
-        pass
-
-    def patch(self, p: Patch) -> None:
-        """send this patch to the server and apply it to our local
-        graph and run handlers"""
-
-        if not self.isConnected or self.currentClient is None:
-            log.warn("not currently connected- dropping patch")
-            return
-
-        if p.isNoop():
-            log.info("skipping no-op patch")
-            return
-
-        # these could fail if we're out of sync. One approach:
-        # Rerequest the full state from the server, try the patch
-        # again after that, then give up.
-        debugKey = '[id=%s]' % (id(p) % 1000)
-        log.debug("\napply local patch %s %s", debugKey, p)
-        try:
-            self._applyPatchLocally(p)
-        except ValueError as e:
-            log.error(e)
-            self.resync()
-            return
-        log.debug('runDepsOnNewPatch')
-        self.runDepsOnNewPatch(p)
-        log.debug('sendPatch')
-        self.currentClient.sendPatch(p)
-        log.debug('patch is done %s', debugKey)
-
-    def suggestPrefixes(self, ctx, prefixes):
-        """
-        when writing files for this ctx, try to use these n3
-        prefixes. async, not guaranteed to finish before any
-        particular file flush
-        """
-        treq.post(self.rdfdbRoot + 'prefixes', json.dumps({'ctx': ctx, 'prefixes': prefixes}).encode('utf8'))
-
-    def _applyPatchLocally(self, p: Patch):
-        # .. and disconnect on failure
-        patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
-        log.debug("graph now has %s statements" % len(self._graph))
-
-    def onPatchFromDb(self, p):
-        """
-        central server has sent us a patch
-        """
-        if log.isEnabledFor(logging.DEBUG):
-            if len(p.addQuads) > 50:
-                log.debug('server has sent us %s', p.shortSummary())
-            else:
-                log.debug('server has sent us %s', p)
-
-        self._applyPatchLocally(p)
-        try:
-            self.runDepsOnNewPatch(p)
-        except Exception:
-            # don't reflect this error back to the server; we did
-            # receive its patch correctly. However, we're in a bad
-            # state since some dependencies may not have rerun
-            traceback.print_exc()
-            log.warn("some graph dependencies may not have completely run")
-
-        if self.initiallySynced:
-            self.initiallySynced.callback(None)
-            self.initiallySynced = None