changeset 89:1120c6489888

refactor, redo the SyncedGraph class ordering, fix most typing errors
author drewp@bigasterisk.com
date Tue, 05 Apr 2022 00:54:53 -0700
parents 219d9e89b15c
children f9282b33b8d0
files rdfdb/autodepgraphapi.py rdfdb/currentstategraphapi.py rdfdb/grapheditapi.py rdfdb/grapheditapi_test.py rdfdb/graphfile.py rdfdb/graphfile_test.py rdfdb/service.py rdfdb/syncedgraph.py rdfdb/syncedgraph_base.py
diffstat 9 files changed, 372 insertions(+), 320 deletions(-) [+]
line wrap: on
line diff
--- a/rdfdb/autodepgraphapi.py	Tue Apr 05 00:53:10 2022 -0700
+++ b/rdfdb/autodepgraphapi.py	Tue Apr 05 00:54:53 2022 -0700
@@ -1,5 +1,6 @@
 import logging
 from typing import Callable, Dict, List, Set, Tuple
+from rdfdb.syncedgraph_base import SyncedGraphBase
 
 from rdflib import RDF, RDFS, URIRef
 
@@ -8,7 +9,7 @@
 log = logging.getLogger('autodepgraphapi')
 
 
-class AutoDepGraphApi(object):
+class AutoDepGraphApi(SyncedGraphBase):
     """
     knockoutjs-inspired API for automatically building a dependency
     tree while reading the graph. See addHandler().
@@ -18,7 +19,7 @@
     section of code. This is supposed to make it easier to notice
     dependency mistakes, especially when porting old code to use
     SyncedGraph.
-    
+
     This class is a mixin for SyncedGraph, separated here because
     these methods work together
     """
@@ -137,7 +138,7 @@
         """currently this needs to be in an addHandler section, but it
         sets no watchers so it won't actually update if the statement
         was added or dropped from contexts"""
-        #func = self._getCurrentFunc()
+        # func = self._getCurrentFunc()
         return contextsForStatementNoWildcards(self._graph, triple)
 
     # i find myself wanting 'patch' (aka enter/leave) versions of these calls that tell
@@ -184,7 +185,7 @@
 
         this removes the handlers that it gives you
         """
-        #self.dependencies()
+        # self.dependencies()
         ret: Set[Callable[[], None]] = set()
         affectedSubjPreds = set([(s, p) for s, p, o, c in patch.addQuads] + [(s, p) for s, p, o, c in patch.delQuads])
         for (s, p), funcs in self._handlersSp.items():
--- a/rdfdb/currentstategraphapi.py	Tue Apr 05 00:53:10 2022 -0700
+++ b/rdfdb/currentstategraphapi.py	Tue Apr 05 00:54:53 2022 -0700
@@ -2,21 +2,63 @@
 import logging
 import time
 import traceback
+from typing import Optional, Set, Tuple
 
 from rdflib import ConjunctiveGraph, URIRef
+from rdflib.term import Node
 
 from rdfdb.rdflibpatch import contextsForStatement as rp_contextsForStatement
+from rdfdb.readonly_graph import ReadOnlyConjunctiveGraph
+from rdfdb.syncedgraph_base import SyncedGraphBase
 
 log = logging.getLogger("currentstate")
 
+TripleFilter = Tuple[Optional[Node], Optional[Node], Optional[Node]]
 
 
-class CurrentStateGraphApi(object):
+class Mgr(object):
+
+    def __init__(self, graph, tripleFilter):
+        self._graph = graph
+        self._tripleFilter = tripleFilter
+
+    def __enter__(self):
+        # this should be a readonly view of the existing
+        # graph, maybe with something to guard against
+        # writes/patches happening while reads are being
+        # done. Typical usage will do some reads on this graph
+        # before moving on to writes.
+
+        if True:
+            g = ReadOnlyConjunctiveGraph(self._graph)
+        else:
+            t1 = time.time()
+            g = ConjunctiveGraph()
+            for s, p, o, c in self._graph.quads(self._tripleFilter):
+                g.store.add((s, p, o), c)
+
+            if self._tripleFilter == (None, None, None):
+                self.logThisCopy(g, time.time() - t1)
+
+        g.contextsForStatement = lambda stmt: contextsForStatementNoWildcards(g, stmt)
+        return g
+
+    def logThisCopy(self, g, sec):
+        log.info("copied graph %s statements (%.1f ms) " "because of this:" % (len(g), sec * 1000))
+        for frame in traceback.format_stack(limit=4)[:-2]:
+            for line in frame.splitlines():
+                log.info("  " + line)
+
+    def __exit__(self, type, val, tb):
+        return
+
+
+class CurrentStateGraphApi(SyncedGraphBase):
     """
     mixin for SyncedGraph, separated here because these methods work together
     """
 
-    def currentState(self, context=None, tripleFilter=(None, None, None)):
+    def currentState(self, context: Optional[URIRef] = None, tripleFilter: TripleFilter = (None, None, None)) -> Mgr:
         """
         a graph you can read without being in an addHandler
 
@@ -25,43 +67,11 @@
         if context is not None:
             raise NotImplementedError("currentState with context arg")
 
-        class Mgr(object):
-
-            def __enter__(self2):
-                # this should be a readonly view of the existing
-                # graph, maybe with something to guard against
-                # writes/patches happening while reads are being
-                # done. Typical usage will do some reads on this graph
-                # before moving on to writes.
-
-                if 1:
-                    g = ReadOnlyConjunctiveGraph(self._graph)
-                else:
-                    t1 = time.time()
-                    g = ConjunctiveGraph()
-                    for s, p, o, c in self._graph.quads(tripleFilter):
-                        g.store.add((s, p, o), c)
+        return Mgr(self._graph, tripleFilter)
 
-                    if tripleFilter == (None, None, None):
-                        self2.logThisCopy(g, time.time() - t1)
-
-                setattr(g, 'contextsForStatement', lambda t: contextsForStatementNoWildcards(g, t))
-                return g
+    _reservedSequentials: Optional[Set[URIRef]] = None
 
-            def logThisCopy(self, g, sec):
-                log.info("copied graph %s statements (%.1f ms) " "because of this:" % (len(g), sec * 1000))
-                for frame in traceback.format_stack(limit=4)[:-2]:
-                    for line in frame.splitlines():
-                        log.info("  " + line)
-
-            def __exit__(self, type, val, tb):
-                return
-
-        return Mgr()
-
-    _reservedSequentials = None  # Optional[Set[URIRef]]
-
-    def sequentialUri(self, prefix):
+    def sequentialUri(self, prefix: URIRef) -> URIRef:
         """
         Prefix URIRef like http://example.com/r- will return
         http://example.com/r-1 if that uri is not a subject in the graph,
@@ -74,6 +84,7 @@
             if newUri not in self._reservedSequentials and not list(self._graph.triples((newUri, None, None))):
                 self._reservedSequentials.add(newUri)
                 return newUri
+        raise NotImplementedError("never reached")
 
 
 def contextsForStatementNoWildcards(g, triple):
--- a/rdfdb/grapheditapi.py	Tue Apr 05 00:53:10 2022 -0700
+++ b/rdfdb/grapheditapi.py	Tue Apr 05 00:54:53 2022 -0700
@@ -1,6 +1,8 @@
 import logging
 import random
 from itertools import chain
+from rdfdb.currentstategraphapi import CurrentStateGraphApi
+from rdfdb.syncedgraph_base import SyncedGraphBase
 
 from rdflib import RDF, URIRef
 from rdflib.term import Node
@@ -10,14 +12,14 @@
 log = logging.getLogger('graphedit')
 
 
-class GraphEditApi(object):
+class GraphEditApi(CurrentStateGraphApi, SyncedGraphBase):
     """
     fancier graph edits
-    
+
     mixin for SyncedGraph, separated here because these methods work together
     """
 
-    def getObjectPatch(self, context, subject, predicate, newObject):
+    def getObjectPatch(self, context: URIRef, subject: Node, predicate: URIRef, newObject: Node) -> Patch:
         """send a patch which removes existing values for (s,p,*,c)
         and adds (s,p,newObject,c). Values in other graphs are not affected.
 
@@ -100,31 +102,3 @@
             for spo in chain(self._graph.triples((None, None, node), context=context), self._graph.triples((node, None, None), context=context))
         ])
         self.patch(p)
-
-
-import unittest
-
-from rdflib import ConjunctiveGraph
-
-
-class TestPatchSubgraph(unittest.TestCase):
-
-    def testCollapsesIdenticalQuads(self):
-        appliedPatches = []
-
-        class Obj(GraphEditApi):
-
-            def patch(self, p):
-                appliedPatches.append(p)
-
-            _graph: ConjunctiveGraph
-
-        obj = Obj()
-        obj._graph = ConjunctiveGraph()
-        stmt1 = (URIRef('s'), URIRef('p'), URIRef('o'), URIRef('g'))
-        obj._graph.addN([stmt1])
-        obj.patchSubgraph(URIRef('g'), [stmt1])
-        self.assertEqual(len(appliedPatches), 1)
-        p = appliedPatches[0]
-        self.assertTrue(p.isNoop())
-        self.assertEqual(p.jsonRepr, '{"patch": {"adds": "", "deletes": ""}}')
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rdfdb/grapheditapi_test.py	Tue Apr 05 00:54:53 2022 -0700
@@ -0,0 +1,30 @@
+import unittest
+
+from rdflib import ConjunctiveGraph, URIRef
+
+from rdfdb.grapheditapi import GraphEditApi
+
+
+class TestPatchSubgraph(unittest.TestCase):
+
+    def testCollapsesIdenticalQuads(self):
+        appliedPatches = []
+
+        class Obj(GraphEditApi):
+            def __init__(self):
+                pass
+
+            def patch(self, p):
+                appliedPatches.append(p)
+
+            _graph: ConjunctiveGraph
+
+        obj = Obj()
+        obj._graph = ConjunctiveGraph()
+        stmt1 = (URIRef('s'), URIRef('p'), URIRef('o'), URIRef('g'))
+        obj._graph.addN([stmt1])
+        obj.patchSubgraph(URIRef('g'), [stmt1])
+        self.assertEqual(len(appliedPatches), 1)
+        p = appliedPatches[0]
+        self.assertTrue(p.isNoop())
+        self.assertEqual(p.jsonRepr, '{"patch": {"adds": "", "deletes": ""}}')
--- a/rdfdb/graphfile.py	Tue Apr 05 00:53:10 2022 -0700
+++ b/rdfdb/graphfile.py	Tue Apr 05 00:54:53 2022 -0700
@@ -3,6 +3,7 @@
 import time
 import traceback
 from typing import Dict, Optional, Protocol, cast
+from pathlib import Path
 
 import twisted.internet.reactor
 from rdflib import Graph, URIRef
@@ -39,7 +40,7 @@
     one rdf file that we read from, write to, and notice external changes to
     """
 
-    def __init__(self, notifier: INotify, path: str, uri: URIRef, patch: PatchCb, getSubgraph: GetSubgraph, globalPrefixes: Dict[str, URIRef],
+    def __init__(self, notifier: INotify, path: Path, uri: URIRef, patch: PatchCb, getSubgraph: GetSubgraph, globalPrefixes: Dict[str, URIRef],
                  ctxPrefixes: Dict[str, URIRef]):
         """
         uri is the context for the triples in this file. We assume
@@ -205,7 +206,7 @@
     def flush(self) -> None:
         self.writeCall = None
 
-        tmpOut = self.path + ".rdfdb-temp"
+        tmpOut = str(self.path) + ".rdfdb-temp"
         f = open(tmpOut, 'wb')
         t1 = time.time()
         for p, n in (list(self.globalPrefixes.items()) + list(self.readPrefixes.items()) + list(self.ctxPrefixes.items())):
--- a/rdfdb/graphfile_test.py	Tue Apr 05 00:53:10 2022 -0700
+++ b/rdfdb/graphfile_test.py	Tue Apr 05 00:54:53 2022 -0700
@@ -1,13 +1,15 @@
 import tempfile
 import unittest
+from pathlib import Path
 from typing import cast
 
 import mock
+from _pytest.assertion import truncate
 from rdflib import Graph, URIRef
 from twisted.internet.inotify import INotify
 
 from rdfdb.graphfile import GraphFile
-from _pytest.assertion import truncate
+
 truncate.DEFAULT_MAX_LINES = 9999
 truncate.DEFAULT_MAX_CHARS = 9999
 
@@ -26,7 +28,7 @@
         def getSubgraph(uri):
             return Graph()
 
-        gf = GraphFile(cast(INotify, mock.Mock()), tf.name, URIRef('uri'), mock.Mock(), getSubgraph, {}, {})
+        gf = GraphFile(cast(INotify, mock.Mock()), Path(tf.name), URIRef('uri'), mock.Mock(), getSubgraph, {}, {})
         gf.reread()
 
         newGraph = Graph()
@@ -35,8 +37,7 @@
         gf.flush()
         wroteContent = open(tf.name, 'rb').read()
         print(wroteContent)
-        self.assertEqual(
-            b'''@prefix : <http://example.com/> .
+        self.assertEqual(b'''@prefix : <http://example.com/> .
 @prefix n: <http://example.com/n/> .
 
 :boo n:two <http://example.com/other/ns> .
--- a/rdfdb/service.py	Tue Apr 05 00:53:10 2022 -0700
+++ b/rdfdb/service.py	Tue Apr 05 00:54:53 2022 -0700
@@ -4,22 +4,24 @@
 import optparse
 import os
 import sys
+from pathlib import Path
 from typing import Dict, List, Optional, cast
 
 import cyclone.web
 import cyclone.websocket
 import twisted.internet.error
+import twisted.internet.reactor
 from rdflib import ConjunctiveGraph, Graph, URIRef
-import twisted.internet.reactor
 from twisted.internet.inotify import IN_CREATE, INotify
+from twisted.internet.interfaces import IReactorCore
 from twisted.python.failure import Failure
 from twisted.python.filepath import FilePath
-from twisted.internet.interfaces import IReactorCore
 
 from rdfdb.file_vs_uri import (DirUriMap, correctToTopdirPrefix, fileForUri, uriFromFile)
 from rdfdb.graphfile import GetSubgraph, GraphFile, PatchCb
 from rdfdb.patch import ALLSTMTS, Patch
 from rdfdb.rdflibpatch import patchQuads
+
 reactor = cast(IReactorCore, twisted.internet.reactor)
 
 # gatherProcessStats()
@@ -46,6 +48,9 @@
     pass
 
 
+CtxPrefixes = Dict[Optional[URIRef], Dict[str, URIRef]]
+
+
 class WatchedFiles(object):
     """
     find files, notice new files.
@@ -53,7 +58,7 @@
     This object watches directories. Each GraphFile watches its own file.
     """
 
-    def __init__(self, dirUriMap: DirUriMap, patch: PatchCb, getSubgraph: GetSubgraph, addlPrefixes: Dict[str, URIRef]):
+    def __init__(self, dirUriMap: DirUriMap, patch: PatchCb, getSubgraph: GetSubgraph, addlPrefixes: CtxPrefixes):
         self.dirUriMap = dirUriMap  # {abspath : uri prefix}
         self.patch, self.getSubgraph = patch, getSubgraph
         self.addlPrefixes = addlPrefixes
@@ -70,10 +75,10 @@
         try:
             for topdir in self.dirUriMap:
                 for dirpath, dirnames, filenames in os.walk(topdir):
+                    dirpath = Path(dirpath)
                     for base in filenames:
-                        p = os.path.join(dirpath, base)
+                        p = dirpath / base
                         # why wasn't mypy catching this?
-                        assert isinstance(p, bytes)
                         self.watchFile(p)
                     self.notifier.watch(FilePath(dirpath), autoAdd=True, callbacks=[self.dirChange])
         finally:
@@ -81,13 +86,13 @@
 
     def dirChange(self, watch, path: FilePath, mask):
         if mask & IN_CREATE:
-            if cast(str, path.path).endswith((b'~', b'.swp', b'swx', b'.rdfdb-temp')):
+            if cast(str, path.path).endswith(('~', '.swp', 'swx', '.rdfdb-temp')):
                 return
 
             log.debug("%s created; consider adding a watch", path)
-            self.watchFile(path.path)
+            self.watchFile(Path(cast(str, path.path)))
 
-    def watchFile(self, inFile: bytes):
+    def watchFile(self, inFile: Path):
         """
         consider adding a GraphFile to self.graphFiles
 
@@ -99,10 +104,10 @@
             return
 
         inFile = correctToTopdirPrefix(self.dirUriMap, inFile)
-        if os.path.splitext(inFile)[1] not in [b'.n3']:
+        if os.path.splitext(inFile)[1] not in ['.n3']:
             return
 
-        if b'/capture/' in inFile:
+        if '/capture/' in str(inFile):
             # smaller graph for now
             return
 
@@ -111,13 +116,13 @@
         # SyncedGraph calls graphFromNQuad on the incoming data and
         # has a parse error. I'm not sure where this should be fixed
         # yet.
-        if b'-rules' in inFile:
+        if '-rules' in str(inFile):
             return
 
         # for legacy versions, compile all the config stuff you want
         # read into one file called config.n3. New versions won't read
         # it.
-        if inFile.endswith(b"config.n3"):
+        if inFile.name == "config.n3":
             return
 
         ctx = uriFromFile(self.dirUriMap, inFile)
@@ -138,19 +143,16 @@
         """
         if ctx not in self.graphFiles:
             outFile = fileForUri(self.dirUriMap, ctx)
-            # mypy missed the next line because of
-            # https://github.com/python/typeshed/issues/2937 ('str in
-            # bytes' isn't an error)
-            assert b'//' not in outFile, (outFile, self.dirUriMap, ctx)
+            assert '//' not in str(outFile), (outFile, self.dirUriMap, ctx)
             log.info("starting new file %r", outFile)
             self._addGraphFile(ctx, outFile)
 
-    def _addGraphFile(self, ctx, path):
+    def _addGraphFile(self, ctx: URIRef, path: Path):
         self.addlPrefixes.setdefault(ctx, {})
         self.addlPrefixes.setdefault(None, {})
         gf = GraphFile(self.notifier, path, ctx, self.patch, self.getSubgraph, globalPrefixes=self.addlPrefixes[None], ctxPrefixes=self.addlPrefixes[ctx])
         self.graphFiles[ctx] = gf
-        fileStats.mappedGraphFiles = len(self.graphFiles)
+        # fileStats.mappedGraphFiles = len(self.graphFiles)
         return gf
 
     def dirtyFiles(self, ctxs):
@@ -221,10 +223,14 @@
     def __init__(self, dirUriMap: DirUriMap, addlPrefixes):
         self.clients: List[WebsocketClient] = []
         self.graph = ConjunctiveGraph()
+
         # stats.graphLen = len(self.graph)
         # stats.clients = len(self.clients)
 
-        self.watchedFiles = WatchedFiles(dirUriMap, self.patch, self.getSubgraph, addlPrefixes)
+        def callPatch(patch: Patch, dueToFileChange: bool = False):
+            self.patch(patch, dueToFileChange=dueToFileChange)
+
+        self.watchedFiles = WatchedFiles(dirUriMap, callPatch, self.getSubgraph, addlPrefixes)
 
         self.summarizeToLog()
 
@@ -245,12 +251,12 @@
         # an error here needs to drop the sender, and reset everyone
         # else if we can't rollback the failing patch.
         patchQuads(self.graph, patch.delQuads, patch.addQuads, perfect=True)
-        stats.graphLen = len(self.graph)
+        # stats.graphLen = len(self.graph)
 
         self._syncPatchToOtherClients(patch, sender)
         if not dueToFileChange:
             self.watchedFiles.dirtyFiles([ctx])
-        graphStats.statements = len(self.graph)
+        # graphStats.statements = len(self.graph)
 
     def _syncPatchToOtherClients(self, p: Patch, sender: Optional[str] = None):
         for c in self.clients:
@@ -266,12 +272,15 @@
         log.info("%r %r - dropping client", c, err.getErrorMessage())
         if c in self.clients:
             self.clients.remove(c)
-        stats.clients = len(self.clients)
+        # stats.clients = len(self.clients)
 
     def summarizeToLog(self):
         log.info("contexts in graph (%s total stmts):" % len(self.graph))
         for c in self.graph.contexts():
-            log.info("  %s: %s statements" % (c.identifier, len(self.getSubgraph(c.identifier))))
+            ci = cast(URIRef, c.identifier)
+            g = self.getSubgraph(ci)
+            n = g.__len__()
+            log.info("  %s: %s statements" % (c.identifier, n))
 
     def getSubgraph(self, uri: URIRef) -> Graph:
         """
@@ -282,7 +291,7 @@
         and it's returning triples, but I think quads would be better
         """
         # this is returning an empty Graph :(
-        #return self.graph.get_context(uri)
+        # return self.graph.get_context(uri)
 
         g = Graph()
         for s in self.graph.triples(ALLSTMTS, uri):
@@ -293,7 +302,7 @@
         log.info("new connection: sending all graphs to %r" % newClient)
         newClient.sendPatch(Patch(addQuads=self.graph.quads(ALLSTMTS), delQuads=[]))
         self.clients.append(newClient)
-        stats.clients = len(self.clients)
+        # stats.clients = len(self.clients)
 
 
 class GraphResource(cyclone.web.RequestHandler):
@@ -335,7 +344,7 @@
 def main(dirUriMap: Optional[DirUriMap] = None, prefixes: Optional[Dict[str, URIRef]] = None, port=9999):
 
     if dirUriMap is None:
-        dirUriMap = {b'data/': URIRef('http://example.com/data/')}
+        dirUriMap = {Path('data/'): URIRef('http://example.com/data/')}
     if prefixes is None:
         prefixes = {
             'rdf': URIRef('http://www.w3.org/1999/02/22-rdf-syntax-ns#'),
@@ -379,4 +388,4 @@
 
 
 if __name__ == '__main__':
-    main()
\ No newline at end of file
+    main()
--- a/rdfdb/syncedgraph.py	Tue Apr 05 00:53:10 2022 -0700
+++ b/rdfdb/syncedgraph.py	Tue Apr 05 00:54:53 2022 -0700
@@ -14,228 +14,27 @@
 WsClientProtocol one connection with the rdfdb server.
 """
 
-import json
-import logging
-import traceback
-import urllib.parse
-from typing import Any, Optional, cast
-
-import autobahn.twisted.websocket
-import treq
-import twisted.internet.reactor
-from rdflib import ConjunctiveGraph, URIRef
-from twisted.internet import defer
-from twisted.internet.interfaces import IReactorCore
-
+from typing import Optional
 from rdfdb.autodepgraphapi import AutoDepGraphApi
-from rdfdb.currentstategraphapi import CurrentStateGraphApi
 from rdfdb.grapheditapi import GraphEditApi
-from rdfdb.patch import Patch
-from rdfdb.rdflibpatch import patchQuads
 # everybody who writes literals needs to get this
 from rdfdb.rdflibpatch_literal import patch
-
+from rdfdb.syncedgraph_base import SyncedGraphBase
+from rdflib import URIRef
 patch()
-reactor = cast(IReactorCore, twisted.internet.reactor)
-
-log = logging.getLogger('syncedgraph')
-
-
-class WsClientProtocol(autobahn.twisted.websocket.WebSocketClientProtocol):
-    """The server for this is service.WebsocketClient"""
-
-    def __init__(self, sg):
-        super().__init__()
-        self.sg = sg
-        self.sg.currentClient = self
-        self.connectionId = None
-
-    def onConnect(self, response):
-        log.info('conn %r', response)
-
-    def onOpen(self):
-        log.info('ws open')
-        self.sg.isConnected = True
-
-    def onMessage(self, payload, isBinary):
-        msg = json.loads(payload)
-        if 'connectedAs' in msg:
-            self.connectionId = msg['connectedAs']
-            log.info(f'rdfdb calls us {self.connectionId}')
-        elif 'patch' in msg:
-            p = Patch(jsonRepr=payload.decode('utf8'))
-            log.debug("received patch %s", p.shortSummary())
-            self.sg.onPatchFromDb(p)
-        else:
-            log.warn('unknown msg from websocket: %s...', payload[:32])
-
-    def sendPatch(self, p: Patch):
-        # this is where we could concatenate little patches into a
-        # bigger one. Often, many statements will cancel each
-        # other out.
-
-        # also who's going to accumulate patches when server is down,
-        # or is that not allowed?
-        if self.connectionId is None:
-            raise ValueError("can't send patches before we get an id")
-        body = p.makeJsonRepr()
-        log.debug(f'connectionId={self.connectionId} sending patch {len(body)} bytes')
-        self.sendMessage(body.encode('utf8'))
-
-    def onClose(self, wasClean, code, reason):
-        log.info("WebSocket connection closed: {0}".format(reason))
-        self.sg.lostRdfdbConnection()
 
 
-class SyncedGraph(CurrentStateGraphApi, AutoDepGraphApi, GraphEditApi):
-    """
-    graph for clients to use. Changes are synced with the master graph
-    in the rdfdb process.
-
-    self.patch(p: Patch) is the only way to write to the graph.
-
-    Reading can be done with the AutoDepGraphApi methods which set up
-    watchers to call you back when the results of the read have
-    changed (like knockoutjs). Or you can read with
-    CurrentStateGraphApi which doesn't have watchers, but you have to
-    opt into using it so it's clear you aren't in an auto-dep context
-    and meant to set up watchers.
-
-    You may want to attach to self.initiallySynced deferred so you
-    don't attempt patches before we've heard the initial contents of
-    the graph. It would be ok to accumulate some patches of new
-    material, but usually you won't correctly remove the existing
-    statements unless we have the correct graph.
-
-    If we get out of sync, we abandon our local graph (even any
-    pending local changes) and get the data again from the server.
-    """
-
+class SyncedGraph(AutoDepGraphApi, GraphEditApi):
+    '''
+    SyncedGraphBase
+       |       |
+    CurState AutoDep
+       |       |
+    GraphEdit  |
+          |    |
+       SyncedGraph
+    '''
     def __init__(self, rdfdbRoot: URIRef, label: str, receiverHost: Optional[str] = None):
-        """
-        label is a string that the server will display in association
-        with your connection
-
-        receiverHost is the hostname other nodes can use to talk to me
-        """
-        self.isConnected = False
-        self.currentClient: Optional[WsClientProtocol] = None
-        self.rdfdbRoot = rdfdbRoot
-        self.connectSocket()
-        self.initiallySynced: defer.Deferred[None] = defer.Deferred()
-        self._graph = ConjunctiveGraph()
-
+        SyncedGraphBase.__init__(self, rdfdbRoot, label, receiverHost)
         AutoDepGraphApi.__init__(self)
-        # this needs more state to track if we're doing a resync (and
-        # everything has to error or wait) or if we're live
-
-    def lostRdfdbConnection(self) -> None:
-        self.isConnected = False
-        self.patch(Patch(delQuads=self._graph.quads()))
-        log.info(f'cleared graph to {len(self._graph)}')
-        log.error('graph is not updating- you need to restart')
-        self.connectSocket()
-
-    def connectSocket(self) -> None:
-        factory = autobahn.twisted.websocket.WebSocketClientFactory(
-            self.rdfdbRoot.replace('http://', 'ws://') + 'syncedGraph',
-            # Don't know if this is required by spec, but
-            # cyclone.websocket breaks with no origin header.
-            origin='foo')
-        factory.protocol = cast(Any, lambda: WsClientProtocol(self))
-
-        rr = urllib.parse.urlparse(self.rdfdbRoot)
-        reactor.connectTCP(rr.hostname, rr.port, factory)
-        # WsClientProtocol sets our currentClient. Needs rewrite using agents.
-
-    def resync(self):
-        """
-        get the whole graph again from the server (e.g. we had a
-        conflict while applying a patch and want to return to the
-        truth).
-
-        To avoid too much churn, we remember our old graph and diff it
-        against the replacement. This way, our callers only see the
-        corrections.
 
-        Edits you make during a resync will surely be lost, so I
-        should just fail them. There should be a notification back to
-        UIs who want to show that we're doing a resync.
-        """
-        log.info('resync')
-        if self.currentClient:
-            self.currentClient.dropConnection()
-
-    def _resyncGraph(self, response):
-        log.warn("new graph in")
-
-        if self.currentClient:
-            self.currentClient.dropConnection()
-        # diff against old entire graph
-        # broadcast that change
-
-    def patch(self, p: Patch) -> None:
-        """send this patch to the server and apply it to our local
-        graph and run handlers"""
-
-        if not self.isConnected or self.currentClient is None:
-            log.warn("not currently connected- dropping patch")
-            return
-
-        if p.isNoop():
-            log.info("skipping no-op patch")
-            return
-
-        # these could fail if we're out of sync. One approach:
-        # Rerequest the full state from the server, try the patch
-        # again after that, then give up.
-        debugKey = '[id=%s]' % (id(p) % 1000)
-        log.debug("\napply local patch %s %s", debugKey, p)
-        try:
-            self._applyPatchLocally(p)
-        except ValueError as e:
-            log.error(e)
-            self.resync()
-            return
-        log.debug('runDepsOnNewPatch')
-        self.runDepsOnNewPatch(p)
-        log.debug('sendPatch')
-        self.currentClient.sendPatch(p)
-        log.debug('patch is done %s', debugKey)
-
-    def suggestPrefixes(self, ctx, prefixes):
-        """
-        when writing files for this ctx, try to use these n3
-        prefixes. async, not guaranteed to finish before any
-        particular file flush
-        """
-        treq.post(self.rdfdbRoot + 'prefixes', json.dumps({'ctx': ctx, 'prefixes': prefixes}).encode('utf8'))
-
-    def _applyPatchLocally(self, p: Patch):
-        # .. and disconnect on failure
-        patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
-        log.debug("graph now has %s statements" % len(self._graph))
-
-    def onPatchFromDb(self, p):
-        """
-        central server has sent us a patch
-        """
-        if log.isEnabledFor(logging.DEBUG):
-            if len(p.addQuads) > 50:
-                log.debug('server has sent us %s', p.shortSummary())
-            else:
-                log.debug('server has sent us %s', p)
-
-        self._applyPatchLocally(p)
-        try:
-            self.runDepsOnNewPatch(p)
-        except Exception:
-            # don't reflect this error back to the server; we did
-            # receive its patch correctly. However, we're in a bad
-            # state since some dependencies may not have rerun
-            traceback.print_exc()
-            log.warn("some graph dependencies may not have completely run")
-
-        if self.initiallySynced:
-            self.initiallySynced.callback(None)
-            self.initiallySynced = None
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rdfdb/syncedgraph_base.py	Tue Apr 05 00:54:53 2022 -0700
@@ -0,0 +1,226 @@
+import json
+import logging
+import traceback
+import urllib.parse
+from typing import Any, Optional, cast
+
+import autobahn.twisted.websocket
+import treq
+import twisted.internet.reactor
+from rdflib import ConjunctiveGraph, URIRef
+from twisted.internet import defer
+from twisted.internet.interfaces import IReactorCore
+
+from rdfdb.patch import Patch
+from rdfdb.rdflibpatch import patchQuads
+
+
+class WsClientProtocol(autobahn.twisted.websocket.WebSocketClientProtocol):
+    """The server for this is service.WebsocketClient"""
+
+    def __init__(self, sg):
+        super().__init__()
+        self.sg = sg
+        self.sg.currentClient = self
+        self.connectionId = None
+
+    def onConnect(self, response):
+        log.info('conn %r', response)
+
+    def onOpen(self):
+        log.info('ws open')
+        self.sg.isConnected = True
+
+    def onMessage(self, payload, isBinary):
+        msg = json.loads(payload)
+        if 'connectedAs' in msg:
+            self.connectionId = msg['connectedAs']
+            log.info(f'rdfdb calls us {self.connectionId}')
+        elif 'patch' in msg:
+            p = Patch(jsonRepr=payload.decode('utf8'))
+            log.debug("received patch %s", p.shortSummary())
+            self.sg.onPatchFromDb(p)
+        else:
+            log.warn('unknown msg from websocket: %s...', payload[:32])
+
+    def sendPatch(self, p: Patch):
+        # this is where we could concatenate little patches into a
+        # bigger one. Often, many statements will cancel each
+        # other out.
+
+        # also who's going to accumulate patches when server is down,
+        # or is that not allowed?
+        if self.connectionId is None:
+            raise ValueError("can't send patches before we get an id")
+        body = p.makeJsonRepr()
+        log.debug(f'connectionId={self.connectionId} sending patch {len(body)} bytes')
+        self.sendMessage(body.encode('utf8'))
+
+    def onClose(self, wasClean, code, reason):
+        log.info("WebSocket connection closed: {0}".format(reason))
+        self.sg.lostRdfdbConnection()
+
+
+reactor = cast(IReactorCore, twisted.internet.reactor)
+
+log = logging.getLogger('syncedgraph')
+
+
+class SyncedGraphBase(object):
+    """
+    graph for clients to use. Changes are synced with the master graph
+    in the rdfdb process.
+
+    self.patch(p: Patch) is the only way to write to the graph.
+
+    Reading can be done with the AutoDepGraphApi methods which set up
+    watchers to call you back when the results of the read have
+    changed (like knockoutjs). Or you can read with
+    CurrentStateGraphApi which doesn't have watchers, but you have to
+    opt into using it so it's clear you aren't in an auto-dep context
+    and meant to set up watchers.
+
+    You may want to attach to self.initiallySynced deferred so you
+    don't attempt patches before we've heard the initial contents of
+    the graph. It would be ok to accumulate some patches of new
+    material, but usually you won't correctly remove the existing
+    statements unless we have the correct graph.
+
+    If we get out of sync, we abandon our local graph (even any
+    pending local changes) and get the data again from the server.
+    """
+
+    def __init__(self, rdfdbRoot: URIRef, label: str, receiverHost: Optional[str] = None):
+        """
+        label is a string that the server will display in association
+        with your connection
+
+        receiverHost is the hostname other nodes can use to talk to me
+        """
+        self.isConnected = False
+        self.currentClient: Optional[WsClientProtocol] = None
+        self.rdfdbRoot = rdfdbRoot
+        self.connectSocket()
+        self.initiallySynced: defer.Deferred[None] = defer.Deferred()
+        self._graph = ConjunctiveGraph()
+
+        # todo:
+        # AutoDepGraphApi.__init__(self)
+
+        # this needs more state to track if we're doing a resync (and
+        # everything has to error or wait) or if we're live
+
+    def lostRdfdbConnection(self) -> None:
+        self.isConnected = False
+        self.patch(Patch(delQuads=self._graph.quads()))
+        log.info(f'cleared graph to {len(self._graph)}')
+        log.error('graph is not updating- you need to restart')
+        self.connectSocket()
+
+    def connectSocket(self) -> None:
+        factory = autobahn.twisted.websocket.WebSocketClientFactory(
+            self.rdfdbRoot.replace('http://', 'ws://') + 'syncedGraph',
+            # Don't know if this is required by spec, but
+            # cyclone.websocket breaks with no origin header.
+            origin='foo')
+        factory.protocol = cast(Any, lambda: WsClientProtocol(self))
+
+        rr = urllib.parse.urlparse(self.rdfdbRoot)
+        reactor.connectTCP(rr.hostname, rr.port, factory)
+        # WsClientProtocol sets our currentClient. Needs rewrite using agents.
+
+    def resync(self):
+        """
+        get the whole graph again from the server (e.g. we had a
+        conflict while applying a patch and want to return to the
+        truth).
+
+        To avoid too much churn, we remember our old graph and diff it
+        against the replacement. This way, our callers only see the
+        corrections.
+
+        Edits you make during a resync will surely be lost, so I
+        should just fail them. There should be a notification back to
+        UIs who want to show that we're doing a resync.
+        """
+        log.info('resync')
+        if self.currentClient:
+            self.currentClient.dropConnection()
+
+    def _resyncGraph(self, response):
+        log.warn("new graph in")
+
+        if self.currentClient:
+            self.currentClient.dropConnection()
+        # diff against old entire graph
+        # broadcast that change
+
+    def runDepsOnNewPatch(self, p):
+        # See AutoDepGraphApi
+        pass
+
+    def patch(self, p: Patch) -> None:
+        """send this patch to the server and apply it to our local
+        graph and run handlers"""
+
+        if not self.isConnected or self.currentClient is None:
+            log.warn("not currently connected- dropping patch")
+            return
+
+        if p.isNoop():
+            log.info("skipping no-op patch")
+            return
+
+        # these could fail if we're out of sync. One approach:
+        # Rerequest the full state from the server, try the patch
+        # again after that, then give up.
+        debugKey = '[id=%s]' % (id(p) % 1000)
+        log.debug("\napply local patch %s %s", debugKey, p)
+        try:
+            self._applyPatchLocally(p)
+        except ValueError as e:
+            log.error(e)
+            self.resync()
+            return
+        log.debug('runDepsOnNewPatch')
+        self.runDepsOnNewPatch(p)
+        log.debug('sendPatch')
+        self.currentClient.sendPatch(p)
+        log.debug('patch is done %s', debugKey)
+
+    def suggestPrefixes(self, ctx, prefixes):
+        """
+        when writing files for this ctx, try to use these n3
+        prefixes. async, not guaranteed to finish before any
+        particular file flush
+        """
+        treq.post(self.rdfdbRoot + 'prefixes', json.dumps({'ctx': ctx, 'prefixes': prefixes}).encode('utf8'))
+
+    def _applyPatchLocally(self, p: Patch):
+        # .. and disconnect on failure
+        patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
+        log.debug("graph now has %s statements" % len(self._graph))
+
+    def onPatchFromDb(self, p):
+        """
+        central server has sent us a patch
+        """
+        if log.isEnabledFor(logging.DEBUG):
+            if len(p.addQuads) > 50:
+                log.debug('server has sent us %s', p.shortSummary())
+            else:
+                log.debug('server has sent us %s', p)
+
+        self._applyPatchLocally(p)
+        try:
+            self.runDepsOnNewPatch(p)
+        except Exception:
+            # don't reflect this error back to the server; we did
+            # receive its patch correctly. However, we're in a bad
+            # state since some dependencies may not have rerun
+            traceback.print_exc()
+            log.warn("some graph dependencies may not have completely run")
+
+        if self.initiallySynced:
+            self.initiallySynced.callback(None)
+            self.initiallySynced = None