# HG changeset patch
# User drewp@bigasterisk.com
# Date 1649145293 25200
# Node ID 1120c64898882fb4ac0e6fbc13a80797f486bf1e
# Parent 219d9e89b15c4614a897cc4979be452010f8f40f
refactor, redo the SyncedGraph class ordering, fix most typing errors
diff -r 219d9e89b15c -r 1120c6489888 rdfdb/autodepgraphapi.py
--- a/rdfdb/autodepgraphapi.py Tue Apr 05 00:53:10 2022 -0700
+++ b/rdfdb/autodepgraphapi.py Tue Apr 05 00:54:53 2022 -0700
@@ -1,5 +1,6 @@
import logging
from typing import Callable, Dict, List, Set, Tuple
+from rdfdb.syncedgraph_base import SyncedGraphBase
from rdflib import RDF, RDFS, URIRef
@@ -8,7 +9,7 @@
log = logging.getLogger('autodepgraphapi')
-class AutoDepGraphApi(object):
+class AutoDepGraphApi(SyncedGraphBase):
"""
knockoutjs-inspired API for automatically building a dependency
tree while reading the graph. See addHandler().
@@ -18,7 +19,7 @@
section of code. This is supposed to make it easier to notice
dependency mistakes, especially when porting old code to use
SyncedGraph.
-
+
This class is a mixin for SyncedGraph, separated here because
these methods work together
"""
@@ -137,7 +138,7 @@
"""currently this needs to be in an addHandler section, but it
sets no watchers so it won't actually update if the statement
was added or dropped from contexts"""
- #func = self._getCurrentFunc()
+ # func = self._getCurrentFunc()
return contextsForStatementNoWildcards(self._graph, triple)
# i find myself wanting 'patch' (aka enter/leave) versions of these calls that tell
@@ -184,7 +185,7 @@
this removes the handlers that it gives you
"""
- #self.dependencies()
+ # self.dependencies()
ret: Set[Callable[[], None]] = set()
affectedSubjPreds = set([(s, p) for s, p, o, c in patch.addQuads] + [(s, p) for s, p, o, c in patch.delQuads])
for (s, p), funcs in self._handlersSp.items():
diff -r 219d9e89b15c -r 1120c6489888 rdfdb/currentstategraphapi.py
--- a/rdfdb/currentstategraphapi.py Tue Apr 05 00:53:10 2022 -0700
+++ b/rdfdb/currentstategraphapi.py Tue Apr 05 00:54:53 2022 -0700
@@ -2,21 +2,63 @@
import logging
import time
import traceback
+from typing import Optional, Set, Tuple
from rdflib import ConjunctiveGraph, URIRef
+from rdflib.term import Node
from rdfdb.rdflibpatch import contextsForStatement as rp_contextsForStatement
+from rdfdb.readonly_graph import ReadOnlyConjunctiveGraph
+from rdfdb.syncedgraph_base import SyncedGraphBase
log = logging.getLogger("currentstate")
+TripleFilter = Tuple[Optional[Node], Optional[Node], Optional[Node]]
-class CurrentStateGraphApi(object):
+class Mgr(object):
+
+ def __init__(self, graph, tripleFilter):
+ self._graph = graph
+ self._tripleFilter = tripleFilter
+
+ def __enter__(self):
+ # this should be a readonly view of the existing
+ # graph, maybe with something to guard against
+ # writes/patches happening while reads are being
+ # done. Typical usage will do some reads on this graph
+ # before moving on to writes.
+
+ if True:
+ g = ReadOnlyConjunctiveGraph(self._graph)
+ else:
+ t1 = time.time()
+ g = ConjunctiveGraph()
+ for s, p, o, c in self._graph.quads(self._tripleFilter):
+ g.store.add((s, p, o), c)
+
+ if self._tripleFilter == (None, None, None):
+ self.logThisCopy(g, time.time() - t1)
+
+ g.contextsForStatement = lambda stmt: contextsForStatementNoWildcards(g, stmt)
+ return g
+
+ def logThisCopy(self, g, sec):
+ log.info("copied graph %s statements (%.1f ms) " "because of this:" % (len(g), sec * 1000))
+ for frame in traceback.format_stack(limit=4)[:-2]:
+ for line in frame.splitlines():
+ log.info(" " + line)
+
+ def __exit__(self, type, val, tb):
+ return
+
+
+class CurrentStateGraphApi(SyncedGraphBase):
"""
mixin for SyncedGraph, separated here because these methods work together
"""
- def currentState(self, context=None, tripleFilter=(None, None, None)):
+ def currentState(self, context: Optional[URIRef] = None, tripleFilter: TripleFilter = (None, None, None)) -> Mgr:
"""
a graph you can read without being in an addHandler
@@ -25,43 +67,11 @@
if context is not None:
raise NotImplementedError("currentState with context arg")
- class Mgr(object):
-
- def __enter__(self2):
- # this should be a readonly view of the existing
- # graph, maybe with something to guard against
- # writes/patches happening while reads are being
- # done. Typical usage will do some reads on this graph
- # before moving on to writes.
-
- if 1:
- g = ReadOnlyConjunctiveGraph(self._graph)
- else:
- t1 = time.time()
- g = ConjunctiveGraph()
- for s, p, o, c in self._graph.quads(tripleFilter):
- g.store.add((s, p, o), c)
+ return Mgr(self._graph, tripleFilter)
- if tripleFilter == (None, None, None):
- self2.logThisCopy(g, time.time() - t1)
-
- setattr(g, 'contextsForStatement', lambda t: contextsForStatementNoWildcards(g, t))
- return g
+ _reservedSequentials: Optional[Set[URIRef]] = None
- def logThisCopy(self, g, sec):
- log.info("copied graph %s statements (%.1f ms) " "because of this:" % (len(g), sec * 1000))
- for frame in traceback.format_stack(limit=4)[:-2]:
- for line in frame.splitlines():
- log.info(" " + line)
-
- def __exit__(self, type, val, tb):
- return
-
- return Mgr()
-
- _reservedSequentials = None # Optional[Set[URIRef]]
-
- def sequentialUri(self, prefix):
+ def sequentialUri(self, prefix: URIRef) -> URIRef:
"""
Prefix URIRef like http://example.com/r- will return
http://example.com/r-1 if that uri is not a subject in the graph,
@@ -74,6 +84,7 @@
if newUri not in self._reservedSequentials and not list(self._graph.triples((newUri, None, None))):
self._reservedSequentials.add(newUri)
return newUri
+ raise NotImplementedError("never reached")
def contextsForStatementNoWildcards(g, triple):
diff -r 219d9e89b15c -r 1120c6489888 rdfdb/grapheditapi.py
--- a/rdfdb/grapheditapi.py Tue Apr 05 00:53:10 2022 -0700
+++ b/rdfdb/grapheditapi.py Tue Apr 05 00:54:53 2022 -0700
@@ -1,6 +1,8 @@
import logging
import random
from itertools import chain
+from rdfdb.currentstategraphapi import CurrentStateGraphApi
+from rdfdb.syncedgraph_base import SyncedGraphBase
from rdflib import RDF, URIRef
from rdflib.term import Node
@@ -10,14 +12,14 @@
log = logging.getLogger('graphedit')
-class GraphEditApi(object):
+class GraphEditApi(CurrentStateGraphApi, SyncedGraphBase):
"""
fancier graph edits
-
+
mixin for SyncedGraph, separated here because these methods work together
"""
- def getObjectPatch(self, context, subject, predicate, newObject):
+ def getObjectPatch(self, context: URIRef, subject: Node, predicate: URIRef, newObject: Node) -> Patch:
"""send a patch which removes existing values for (s,p,*,c)
and adds (s,p,newObject,c). Values in other graphs are not affected.
@@ -100,31 +102,3 @@
for spo in chain(self._graph.triples((None, None, node), context=context), self._graph.triples((node, None, None), context=context))
])
self.patch(p)
-
-
-import unittest
-
-from rdflib import ConjunctiveGraph
-
-
-class TestPatchSubgraph(unittest.TestCase):
-
- def testCollapsesIdenticalQuads(self):
- appliedPatches = []
-
- class Obj(GraphEditApi):
-
- def patch(self, p):
- appliedPatches.append(p)
-
- _graph: ConjunctiveGraph
-
- obj = Obj()
- obj._graph = ConjunctiveGraph()
- stmt1 = (URIRef('s'), URIRef('p'), URIRef('o'), URIRef('g'))
- obj._graph.addN([stmt1])
- obj.patchSubgraph(URIRef('g'), [stmt1])
- self.assertEqual(len(appliedPatches), 1)
- p = appliedPatches[0]
- self.assertTrue(p.isNoop())
- self.assertEqual(p.jsonRepr, '{"patch": {"adds": "", "deletes": ""}}')
diff -r 219d9e89b15c -r 1120c6489888 rdfdb/grapheditapi_test.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/rdfdb/grapheditapi_test.py Tue Apr 05 00:54:53 2022 -0700
@@ -0,0 +1,30 @@
+import unittest
+
+from rdflib import ConjunctiveGraph, URIRef
+
+from rdfdb.grapheditapi import GraphEditApi
+
+
+class TestPatchSubgraph(unittest.TestCase):
+
+ def testCollapsesIdenticalQuads(self):
+ appliedPatches = []
+
+ class Obj(GraphEditApi):
+ def __init__(self):
+ pass
+
+ def patch(self, p):
+ appliedPatches.append(p)
+
+ _graph: ConjunctiveGraph
+
+ obj = Obj()
+ obj._graph = ConjunctiveGraph()
+ stmt1 = (URIRef('s'), URIRef('p'), URIRef('o'), URIRef('g'))
+ obj._graph.addN([stmt1])
+ obj.patchSubgraph(URIRef('g'), [stmt1])
+ self.assertEqual(len(appliedPatches), 1)
+ p = appliedPatches[0]
+ self.assertTrue(p.isNoop())
+ self.assertEqual(p.jsonRepr, '{"patch": {"adds": "", "deletes": ""}}')
diff -r 219d9e89b15c -r 1120c6489888 rdfdb/graphfile.py
--- a/rdfdb/graphfile.py Tue Apr 05 00:53:10 2022 -0700
+++ b/rdfdb/graphfile.py Tue Apr 05 00:54:53 2022 -0700
@@ -3,6 +3,7 @@
import time
import traceback
from typing import Dict, Optional, Protocol, cast
+from pathlib import Path
import twisted.internet.reactor
from rdflib import Graph, URIRef
@@ -39,7 +40,7 @@
one rdf file that we read from, write to, and notice external changes to
"""
- def __init__(self, notifier: INotify, path: str, uri: URIRef, patch: PatchCb, getSubgraph: GetSubgraph, globalPrefixes: Dict[str, URIRef],
+ def __init__(self, notifier: INotify, path: Path, uri: URIRef, patch: PatchCb, getSubgraph: GetSubgraph, globalPrefixes: Dict[str, URIRef],
ctxPrefixes: Dict[str, URIRef]):
"""
uri is the context for the triples in this file. We assume
@@ -205,7 +206,7 @@
def flush(self) -> None:
self.writeCall = None
- tmpOut = self.path + ".rdfdb-temp"
+ tmpOut = str(self.path) + ".rdfdb-temp"
f = open(tmpOut, 'wb')
t1 = time.time()
for p, n in (list(self.globalPrefixes.items()) + list(self.readPrefixes.items()) + list(self.ctxPrefixes.items())):
diff -r 219d9e89b15c -r 1120c6489888 rdfdb/graphfile_test.py
--- a/rdfdb/graphfile_test.py Tue Apr 05 00:53:10 2022 -0700
+++ b/rdfdb/graphfile_test.py Tue Apr 05 00:54:53 2022 -0700
@@ -1,13 +1,15 @@
import tempfile
import unittest
+from pathlib import Path
from typing import cast
import mock
+from _pytest.assertion import truncate
from rdflib import Graph, URIRef
from twisted.internet.inotify import INotify
from rdfdb.graphfile import GraphFile
-from _pytest.assertion import truncate
+
truncate.DEFAULT_MAX_LINES = 9999
truncate.DEFAULT_MAX_CHARS = 9999
@@ -26,7 +28,7 @@
def getSubgraph(uri):
return Graph()
- gf = GraphFile(cast(INotify, mock.Mock()), tf.name, URIRef('uri'), mock.Mock(), getSubgraph, {}, {})
+ gf = GraphFile(cast(INotify, mock.Mock()), Path(tf.name), URIRef('uri'), mock.Mock(), getSubgraph, {}, {})
gf.reread()
newGraph = Graph()
@@ -35,8 +37,7 @@
gf.flush()
wroteContent = open(tf.name, 'rb').read()
print(wroteContent)
- self.assertEqual(
- b'''@prefix : .
+ self.assertEqual(b'''@prefix : .
@prefix n: .
:boo n:two .
diff -r 219d9e89b15c -r 1120c6489888 rdfdb/service.py
--- a/rdfdb/service.py Tue Apr 05 00:53:10 2022 -0700
+++ b/rdfdb/service.py Tue Apr 05 00:54:53 2022 -0700
@@ -4,22 +4,24 @@
import optparse
import os
import sys
+from pathlib import Path
from typing import Dict, List, Optional, cast
import cyclone.web
import cyclone.websocket
import twisted.internet.error
+import twisted.internet.reactor
from rdflib import ConjunctiveGraph, Graph, URIRef
-import twisted.internet.reactor
from twisted.internet.inotify import IN_CREATE, INotify
+from twisted.internet.interfaces import IReactorCore
from twisted.python.failure import Failure
from twisted.python.filepath import FilePath
-from twisted.internet.interfaces import IReactorCore
from rdfdb.file_vs_uri import (DirUriMap, correctToTopdirPrefix, fileForUri, uriFromFile)
from rdfdb.graphfile import GetSubgraph, GraphFile, PatchCb
from rdfdb.patch import ALLSTMTS, Patch
from rdfdb.rdflibpatch import patchQuads
+
reactor = cast(IReactorCore, twisted.internet.reactor)
# gatherProcessStats()
@@ -46,6 +48,9 @@
pass
+CtxPrefixes = Dict[Optional[URIRef], Dict[str, URIRef]]
+
+
class WatchedFiles(object):
"""
find files, notice new files.
@@ -53,7 +58,7 @@
This object watches directories. Each GraphFile watches its own file.
"""
- def __init__(self, dirUriMap: DirUriMap, patch: PatchCb, getSubgraph: GetSubgraph, addlPrefixes: Dict[str, URIRef]):
+ def __init__(self, dirUriMap: DirUriMap, patch: PatchCb, getSubgraph: GetSubgraph, addlPrefixes: CtxPrefixes):
self.dirUriMap = dirUriMap # {abspath : uri prefix}
self.patch, self.getSubgraph = patch, getSubgraph
self.addlPrefixes = addlPrefixes
@@ -70,10 +75,10 @@
try:
for topdir in self.dirUriMap:
for dirpath, dirnames, filenames in os.walk(topdir):
+ dirpath = Path(dirpath)
for base in filenames:
- p = os.path.join(dirpath, base)
+ p = dirpath / base
# why wasn't mypy catching this?
- assert isinstance(p, bytes)
self.watchFile(p)
self.notifier.watch(FilePath(dirpath), autoAdd=True, callbacks=[self.dirChange])
finally:
@@ -81,13 +86,13 @@
def dirChange(self, watch, path: FilePath, mask):
if mask & IN_CREATE:
- if cast(str, path.path).endswith((b'~', b'.swp', b'swx', b'.rdfdb-temp')):
+ if cast(str, path.path).endswith(('~', '.swp', 'swx', '.rdfdb-temp')):
return
log.debug("%s created; consider adding a watch", path)
- self.watchFile(path.path)
+ self.watchFile(Path(cast(str, path.path)))
- def watchFile(self, inFile: bytes):
+ def watchFile(self, inFile: Path):
"""
consider adding a GraphFile to self.graphFiles
@@ -99,10 +104,10 @@
return
inFile = correctToTopdirPrefix(self.dirUriMap, inFile)
- if os.path.splitext(inFile)[1] not in [b'.n3']:
+ if os.path.splitext(inFile)[1] not in ['.n3']:
return
- if b'/capture/' in inFile:
+ if '/capture/' in str(inFile):
# smaller graph for now
return
@@ -111,13 +116,13 @@
# SyncedGraph calls graphFromNQuad on the incoming data and
# has a parse error. I'm not sure where this should be fixed
# yet.
- if b'-rules' in inFile:
+ if '-rules' in str(inFile):
return
# for legacy versions, compile all the config stuff you want
# read into one file called config.n3. New versions won't read
# it.
- if inFile.endswith(b"config.n3"):
+ if inFile.name == "config.n3":
return
ctx = uriFromFile(self.dirUriMap, inFile)
@@ -138,19 +143,16 @@
"""
if ctx not in self.graphFiles:
outFile = fileForUri(self.dirUriMap, ctx)
- # mypy missed the next line because of
- # https://github.com/python/typeshed/issues/2937 ('str in
- # bytes' isn't an error)
- assert b'//' not in outFile, (outFile, self.dirUriMap, ctx)
+ assert '//' not in str(outFile), (outFile, self.dirUriMap, ctx)
log.info("starting new file %r", outFile)
self._addGraphFile(ctx, outFile)
- def _addGraphFile(self, ctx, path):
+ def _addGraphFile(self, ctx: URIRef, path: Path):
self.addlPrefixes.setdefault(ctx, {})
self.addlPrefixes.setdefault(None, {})
gf = GraphFile(self.notifier, path, ctx, self.patch, self.getSubgraph, globalPrefixes=self.addlPrefixes[None], ctxPrefixes=self.addlPrefixes[ctx])
self.graphFiles[ctx] = gf
- fileStats.mappedGraphFiles = len(self.graphFiles)
+ # fileStats.mappedGraphFiles = len(self.graphFiles)
return gf
def dirtyFiles(self, ctxs):
@@ -221,10 +223,14 @@
def __init__(self, dirUriMap: DirUriMap, addlPrefixes):
self.clients: List[WebsocketClient] = []
self.graph = ConjunctiveGraph()
+
# stats.graphLen = len(self.graph)
# stats.clients = len(self.clients)
- self.watchedFiles = WatchedFiles(dirUriMap, self.patch, self.getSubgraph, addlPrefixes)
+ def callPatch(patch: Patch, dueToFileChange: bool = False):
+ self.patch(patch, dueToFileChange=dueToFileChange)
+
+ self.watchedFiles = WatchedFiles(dirUriMap, callPatch, self.getSubgraph, addlPrefixes)
self.summarizeToLog()
@@ -245,12 +251,12 @@
# an error here needs to drop the sender, and reset everyone
# else if we can't rollback the failing patch.
patchQuads(self.graph, patch.delQuads, patch.addQuads, perfect=True)
- stats.graphLen = len(self.graph)
+ # stats.graphLen = len(self.graph)
self._syncPatchToOtherClients(patch, sender)
if not dueToFileChange:
self.watchedFiles.dirtyFiles([ctx])
- graphStats.statements = len(self.graph)
+ # graphStats.statements = len(self.graph)
def _syncPatchToOtherClients(self, p: Patch, sender: Optional[str] = None):
for c in self.clients:
@@ -266,12 +272,15 @@
log.info("%r %r - dropping client", c, err.getErrorMessage())
if c in self.clients:
self.clients.remove(c)
- stats.clients = len(self.clients)
+ # stats.clients = len(self.clients)
def summarizeToLog(self):
log.info("contexts in graph (%s total stmts):" % len(self.graph))
for c in self.graph.contexts():
- log.info(" %s: %s statements" % (c.identifier, len(self.getSubgraph(c.identifier))))
+ ci = cast(URIRef, c.identifier)
+ g = self.getSubgraph(ci)
+ n = g.__len__()
+ log.info(" %s: %s statements" % (c.identifier, n))
def getSubgraph(self, uri: URIRef) -> Graph:
"""
@@ -282,7 +291,7 @@
and it's returning triples, but I think quads would be better
"""
# this is returning an empty Graph :(
- #return self.graph.get_context(uri)
+ # return self.graph.get_context(uri)
g = Graph()
for s in self.graph.triples(ALLSTMTS, uri):
@@ -293,7 +302,7 @@
log.info("new connection: sending all graphs to %r" % newClient)
newClient.sendPatch(Patch(addQuads=self.graph.quads(ALLSTMTS), delQuads=[]))
self.clients.append(newClient)
- stats.clients = len(self.clients)
+ # stats.clients = len(self.clients)
class GraphResource(cyclone.web.RequestHandler):
@@ -335,7 +344,7 @@
def main(dirUriMap: Optional[DirUriMap] = None, prefixes: Optional[Dict[str, URIRef]] = None, port=9999):
if dirUriMap is None:
- dirUriMap = {b'data/': URIRef('http://example.com/data/')}
+ dirUriMap = {Path('data/'): URIRef('http://example.com/data/')}
if prefixes is None:
prefixes = {
'rdf': URIRef('http://www.w3.org/1999/02/22-rdf-syntax-ns#'),
@@ -379,4 +388,4 @@
if __name__ == '__main__':
- main()
\ No newline at end of file
+ main()
diff -r 219d9e89b15c -r 1120c6489888 rdfdb/syncedgraph.py
--- a/rdfdb/syncedgraph.py Tue Apr 05 00:53:10 2022 -0700
+++ b/rdfdb/syncedgraph.py Tue Apr 05 00:54:53 2022 -0700
@@ -14,228 +14,27 @@
WsClientProtocol one connection with the rdfdb server.
"""
-import json
-import logging
-import traceback
-import urllib.parse
-from typing import Any, Optional, cast
-
-import autobahn.twisted.websocket
-import treq
-import twisted.internet.reactor
-from rdflib import ConjunctiveGraph, URIRef
-from twisted.internet import defer
-from twisted.internet.interfaces import IReactorCore
-
+from typing import Optional
from rdfdb.autodepgraphapi import AutoDepGraphApi
-from rdfdb.currentstategraphapi import CurrentStateGraphApi
from rdfdb.grapheditapi import GraphEditApi
-from rdfdb.patch import Patch
-from rdfdb.rdflibpatch import patchQuads
# everybody who writes literals needs to get this
from rdfdb.rdflibpatch_literal import patch
-
+from rdfdb.syncedgraph_base import SyncedGraphBase
+from rdflib import URIRef
patch()
-reactor = cast(IReactorCore, twisted.internet.reactor)
-
-log = logging.getLogger('syncedgraph')
-
-
-class WsClientProtocol(autobahn.twisted.websocket.WebSocketClientProtocol):
- """The server for this is service.WebsocketClient"""
-
- def __init__(self, sg):
- super().__init__()
- self.sg = sg
- self.sg.currentClient = self
- self.connectionId = None
-
- def onConnect(self, response):
- log.info('conn %r', response)
-
- def onOpen(self):
- log.info('ws open')
- self.sg.isConnected = True
-
- def onMessage(self, payload, isBinary):
- msg = json.loads(payload)
- if 'connectedAs' in msg:
- self.connectionId = msg['connectedAs']
- log.info(f'rdfdb calls us {self.connectionId}')
- elif 'patch' in msg:
- p = Patch(jsonRepr=payload.decode('utf8'))
- log.debug("received patch %s", p.shortSummary())
- self.sg.onPatchFromDb(p)
- else:
- log.warn('unknown msg from websocket: %s...', payload[:32])
-
- def sendPatch(self, p: Patch):
- # this is where we could concatenate little patches into a
- # bigger one. Often, many statements will cancel each
- # other out.
-
- # also who's going to accumulate patches when server is down,
- # or is that not allowed?
- if self.connectionId is None:
- raise ValueError("can't send patches before we get an id")
- body = p.makeJsonRepr()
- log.debug(f'connectionId={self.connectionId} sending patch {len(body)} bytes')
- self.sendMessage(body.encode('utf8'))
-
- def onClose(self, wasClean, code, reason):
- log.info("WebSocket connection closed: {0}".format(reason))
- self.sg.lostRdfdbConnection()
-class SyncedGraph(CurrentStateGraphApi, AutoDepGraphApi, GraphEditApi):
- """
- graph for clients to use. Changes are synced with the master graph
- in the rdfdb process.
-
- self.patch(p: Patch) is the only way to write to the graph.
-
- Reading can be done with the AutoDepGraphApi methods which set up
- watchers to call you back when the results of the read have
- changed (like knockoutjs). Or you can read with
- CurrentStateGraphApi which doesn't have watchers, but you have to
- opt into using it so it's clear you aren't in an auto-dep context
- and meant to set up watchers.
-
- You may want to attach to self.initiallySynced deferred so you
- don't attempt patches before we've heard the initial contents of
- the graph. It would be ok to accumulate some patches of new
- material, but usually you won't correctly remove the existing
- statements unless we have the correct graph.
-
- If we get out of sync, we abandon our local graph (even any
- pending local changes) and get the data again from the server.
- """
-
+class SyncedGraph(AutoDepGraphApi, GraphEditApi):
+ '''
+ SyncedGraphBase
+ | |
+ CurState AutoDep
+ | |
+ GraphEdit |
+ | |
+ SyncedGraph
+ '''
def __init__(self, rdfdbRoot: URIRef, label: str, receiverHost: Optional[str] = None):
- """
- label is a string that the server will display in association
- with your connection
-
- receiverHost is the hostname other nodes can use to talk to me
- """
- self.isConnected = False
- self.currentClient: Optional[WsClientProtocol] = None
- self.rdfdbRoot = rdfdbRoot
- self.connectSocket()
- self.initiallySynced: defer.Deferred[None] = defer.Deferred()
- self._graph = ConjunctiveGraph()
-
+ SyncedGraphBase.__init__(self, rdfdbRoot, label, receiverHost)
AutoDepGraphApi.__init__(self)
- # this needs more state to track if we're doing a resync (and
- # everything has to error or wait) or if we're live
-
- def lostRdfdbConnection(self) -> None:
- self.isConnected = False
- self.patch(Patch(delQuads=self._graph.quads()))
- log.info(f'cleared graph to {len(self._graph)}')
- log.error('graph is not updating- you need to restart')
- self.connectSocket()
-
- def connectSocket(self) -> None:
- factory = autobahn.twisted.websocket.WebSocketClientFactory(
- self.rdfdbRoot.replace('http://', 'ws://') + 'syncedGraph',
- # Don't know if this is required by spec, but
- # cyclone.websocket breaks with no origin header.
- origin='foo')
- factory.protocol = cast(Any, lambda: WsClientProtocol(self))
-
- rr = urllib.parse.urlparse(self.rdfdbRoot)
- reactor.connectTCP(rr.hostname, rr.port, factory)
- # WsClientProtocol sets our currentClient. Needs rewrite using agents.
-
- def resync(self):
- """
- get the whole graph again from the server (e.g. we had a
- conflict while applying a patch and want to return to the
- truth).
-
- To avoid too much churn, we remember our old graph and diff it
- against the replacement. This way, our callers only see the
- corrections.
- Edits you make during a resync will surely be lost, so I
- should just fail them. There should be a notification back to
- UIs who want to show that we're doing a resync.
- """
- log.info('resync')
- if self.currentClient:
- self.currentClient.dropConnection()
-
- def _resyncGraph(self, response):
- log.warn("new graph in")
-
- if self.currentClient:
- self.currentClient.dropConnection()
- # diff against old entire graph
- # broadcast that change
-
- def patch(self, p: Patch) -> None:
- """send this patch to the server and apply it to our local
- graph and run handlers"""
-
- if not self.isConnected or self.currentClient is None:
- log.warn("not currently connected- dropping patch")
- return
-
- if p.isNoop():
- log.info("skipping no-op patch")
- return
-
- # these could fail if we're out of sync. One approach:
- # Rerequest the full state from the server, try the patch
- # again after that, then give up.
- debugKey = '[id=%s]' % (id(p) % 1000)
- log.debug("\napply local patch %s %s", debugKey, p)
- try:
- self._applyPatchLocally(p)
- except ValueError as e:
- log.error(e)
- self.resync()
- return
- log.debug('runDepsOnNewPatch')
- self.runDepsOnNewPatch(p)
- log.debug('sendPatch')
- self.currentClient.sendPatch(p)
- log.debug('patch is done %s', debugKey)
-
- def suggestPrefixes(self, ctx, prefixes):
- """
- when writing files for this ctx, try to use these n3
- prefixes. async, not guaranteed to finish before any
- particular file flush
- """
- treq.post(self.rdfdbRoot + 'prefixes', json.dumps({'ctx': ctx, 'prefixes': prefixes}).encode('utf8'))
-
- def _applyPatchLocally(self, p: Patch):
- # .. and disconnect on failure
- patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
- log.debug("graph now has %s statements" % len(self._graph))
-
- def onPatchFromDb(self, p):
- """
- central server has sent us a patch
- """
- if log.isEnabledFor(logging.DEBUG):
- if len(p.addQuads) > 50:
- log.debug('server has sent us %s', p.shortSummary())
- else:
- log.debug('server has sent us %s', p)
-
- self._applyPatchLocally(p)
- try:
- self.runDepsOnNewPatch(p)
- except Exception:
- # don't reflect this error back to the server; we did
- # receive its patch correctly. However, we're in a bad
- # state since some dependencies may not have rerun
- traceback.print_exc()
- log.warn("some graph dependencies may not have completely run")
-
- if self.initiallySynced:
- self.initiallySynced.callback(None)
- self.initiallySynced = None
diff -r 219d9e89b15c -r 1120c6489888 rdfdb/syncedgraph_base.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/rdfdb/syncedgraph_base.py Tue Apr 05 00:54:53 2022 -0700
@@ -0,0 +1,226 @@
+import json
+import logging
+import traceback
+import urllib.parse
+from typing import Any, Optional, cast
+
+import autobahn.twisted.websocket
+import treq
+import twisted.internet.reactor
+from rdflib import ConjunctiveGraph, URIRef
+from twisted.internet import defer
+from twisted.internet.interfaces import IReactorCore
+
+from rdfdb.patch import Patch
+from rdfdb.rdflibpatch import patchQuads
+
+
+class WsClientProtocol(autobahn.twisted.websocket.WebSocketClientProtocol):
+ """The server for this is service.WebsocketClient"""
+
+ def __init__(self, sg):
+ super().__init__()
+ self.sg = sg
+ self.sg.currentClient = self
+ self.connectionId = None
+
+ def onConnect(self, response):
+ log.info('conn %r', response)
+
+ def onOpen(self):
+ log.info('ws open')
+ self.sg.isConnected = True
+
+ def onMessage(self, payload, isBinary):
+ msg = json.loads(payload)
+ if 'connectedAs' in msg:
+ self.connectionId = msg['connectedAs']
+ log.info(f'rdfdb calls us {self.connectionId}')
+ elif 'patch' in msg:
+ p = Patch(jsonRepr=payload.decode('utf8'))
+ log.debug("received patch %s", p.shortSummary())
+ self.sg.onPatchFromDb(p)
+ else:
+ log.warn('unknown msg from websocket: %s...', payload[:32])
+
+ def sendPatch(self, p: Patch):
+ # this is where we could concatenate little patches into a
+ # bigger one. Often, many statements will cancel each
+ # other out.
+
+ # also who's going to accumulate patches when server is down,
+ # or is that not allowed?
+ if self.connectionId is None:
+ raise ValueError("can't send patches before we get an id")
+ body = p.makeJsonRepr()
+ log.debug(f'connectionId={self.connectionId} sending patch {len(body)} bytes')
+ self.sendMessage(body.encode('utf8'))
+
+ def onClose(self, wasClean, code, reason):
+ log.info("WebSocket connection closed: {0}".format(reason))
+ self.sg.lostRdfdbConnection()
+
+
+reactor = cast(IReactorCore, twisted.internet.reactor)
+
+log = logging.getLogger('syncedgraph')
+
+
+class SyncedGraphBase(object):
+ """
+ graph for clients to use. Changes are synced with the master graph
+ in the rdfdb process.
+
+ self.patch(p: Patch) is the only way to write to the graph.
+
+ Reading can be done with the AutoDepGraphApi methods which set up
+ watchers to call you back when the results of the read have
+ changed (like knockoutjs). Or you can read with
+ CurrentStateGraphApi which doesn't have watchers, but you have to
+ opt into using it so it's clear you aren't in an auto-dep context
+ and meant to set up watchers.
+
+ You may want to attach to self.initiallySynced deferred so you
+ don't attempt patches before we've heard the initial contents of
+ the graph. It would be ok to accumulate some patches of new
+ material, but usually you won't correctly remove the existing
+ statements unless we have the correct graph.
+
+ If we get out of sync, we abandon our local graph (even any
+ pending local changes) and get the data again from the server.
+ """
+
+ def __init__(self, rdfdbRoot: URIRef, label: str, receiverHost: Optional[str] = None):
+ """
+ label is a string that the server will display in association
+ with your connection
+
+ receiverHost is the hostname other nodes can use to talk to me
+ """
+ self.isConnected = False
+ self.currentClient: Optional[WsClientProtocol] = None
+ self.rdfdbRoot = rdfdbRoot
+ self.connectSocket()
+ self.initiallySynced: defer.Deferred[None] = defer.Deferred()
+ self._graph = ConjunctiveGraph()
+
+ # todo:
+ # AutoDepGraphApi.__init__(self)
+
+ # this needs more state to track if we're doing a resync (and
+ # everything has to error or wait) or if we're live
+
+ def lostRdfdbConnection(self) -> None:
+ self.isConnected = False
+ self.patch(Patch(delQuads=self._graph.quads()))
+ log.info(f'cleared graph to {len(self._graph)}')
+ log.error('graph is not updating- you need to restart')
+ self.connectSocket()
+
+ def connectSocket(self) -> None:
+ factory = autobahn.twisted.websocket.WebSocketClientFactory(
+ self.rdfdbRoot.replace('http://', 'ws://') + 'syncedGraph',
+ # Don't know if this is required by spec, but
+ # cyclone.websocket breaks with no origin header.
+ origin='foo')
+ factory.protocol = cast(Any, lambda: WsClientProtocol(self))
+
+ rr = urllib.parse.urlparse(self.rdfdbRoot)
+ reactor.connectTCP(rr.hostname, rr.port, factory)
+ # WsClientProtocol sets our currentClient. Needs rewrite using agents.
+
+ def resync(self):
+ """
+ get the whole graph again from the server (e.g. we had a
+ conflict while applying a patch and want to return to the
+ truth).
+
+ To avoid too much churn, we remember our old graph and diff it
+ against the replacement. This way, our callers only see the
+ corrections.
+
+ Edits you make during a resync will surely be lost, so I
+ should just fail them. There should be a notification back to
+ UIs who want to show that we're doing a resync.
+ """
+ log.info('resync')
+ if self.currentClient:
+ self.currentClient.dropConnection()
+
+ def _resyncGraph(self, response):
+ log.warn("new graph in")
+
+ if self.currentClient:
+ self.currentClient.dropConnection()
+ # diff against old entire graph
+ # broadcast that change
+
+ def runDepsOnNewPatch(self, p):
+ # See AutoDepGraphApi
+ pass
+
+ def patch(self, p: Patch) -> None:
+ """send this patch to the server and apply it to our local
+ graph and run handlers"""
+
+ if not self.isConnected or self.currentClient is None:
+ log.warn("not currently connected- dropping patch")
+ return
+
+ if p.isNoop():
+ log.info("skipping no-op patch")
+ return
+
+ # these could fail if we're out of sync. One approach:
+ # Rerequest the full state from the server, try the patch
+ # again after that, then give up.
+ debugKey = '[id=%s]' % (id(p) % 1000)
+ log.debug("\napply local patch %s %s", debugKey, p)
+ try:
+ self._applyPatchLocally(p)
+ except ValueError as e:
+ log.error(e)
+ self.resync()
+ return
+ log.debug('runDepsOnNewPatch')
+ self.runDepsOnNewPatch(p)
+ log.debug('sendPatch')
+ self.currentClient.sendPatch(p)
+ log.debug('patch is done %s', debugKey)
+
+ def suggestPrefixes(self, ctx, prefixes):
+ """
+ when writing files for this ctx, try to use these n3
+ prefixes. async, not guaranteed to finish before any
+ particular file flush
+ """
+ treq.post(self.rdfdbRoot + 'prefixes', json.dumps({'ctx': ctx, 'prefixes': prefixes}).encode('utf8'))
+
+ def _applyPatchLocally(self, p: Patch):
+ # .. and disconnect on failure
+ patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True)
+ log.debug("graph now has %s statements" % len(self._graph))
+
+ def onPatchFromDb(self, p):
+ """
+ central server has sent us a patch
+ """
+ if log.isEnabledFor(logging.DEBUG):
+ if len(p.addQuads) > 50:
+ log.debug('server has sent us %s', p.shortSummary())
+ else:
+ log.debug('server has sent us %s', p)
+
+ self._applyPatchLocally(p)
+ try:
+ self.runDepsOnNewPatch(p)
+ except Exception:
+ # don't reflect this error back to the server; we did
+ # receive its patch correctly. However, we're in a bad
+ # state since some dependencies may not have rerun
+ traceback.print_exc()
+ log.warn("some graph dependencies may not have completely run")
+
+ if self.initiallySynced:
+ self.initiallySynced.callback(None)
+ self.initiallySynced = None