Mercurial > code > home > repos > rdfdb
changeset 89:1120c6489888
refactor, redo the SyncedGraph class ordering, fix most typing errors
author | drewp@bigasterisk.com |
---|---|
date | Tue, 05 Apr 2022 00:54:53 -0700 |
parents | 219d9e89b15c |
children | f9282b33b8d0 |
files | rdfdb/autodepgraphapi.py rdfdb/currentstategraphapi.py rdfdb/grapheditapi.py rdfdb/grapheditapi_test.py rdfdb/graphfile.py rdfdb/graphfile_test.py rdfdb/service.py rdfdb/syncedgraph.py rdfdb/syncedgraph_base.py |
diffstat | 9 files changed, 372 insertions(+), 320 deletions(-) [+] |
line wrap: on
line diff
--- a/rdfdb/autodepgraphapi.py Tue Apr 05 00:53:10 2022 -0700 +++ b/rdfdb/autodepgraphapi.py Tue Apr 05 00:54:53 2022 -0700 @@ -1,5 +1,6 @@ import logging from typing import Callable, Dict, List, Set, Tuple +from rdfdb.syncedgraph_base import SyncedGraphBase from rdflib import RDF, RDFS, URIRef @@ -8,7 +9,7 @@ log = logging.getLogger('autodepgraphapi') -class AutoDepGraphApi(object): +class AutoDepGraphApi(SyncedGraphBase): """ knockoutjs-inspired API for automatically building a dependency tree while reading the graph. See addHandler(). @@ -18,7 +19,7 @@ section of code. This is supposed to make it easier to notice dependency mistakes, especially when porting old code to use SyncedGraph. - + This class is a mixin for SyncedGraph, separated here because these methods work together """ @@ -137,7 +138,7 @@ """currently this needs to be in an addHandler section, but it sets no watchers so it won't actually update if the statement was added or dropped from contexts""" - #func = self._getCurrentFunc() + # func = self._getCurrentFunc() return contextsForStatementNoWildcards(self._graph, triple) # i find myself wanting 'patch' (aka enter/leave) versions of these calls that tell @@ -184,7 +185,7 @@ this removes the handlers that it gives you """ - #self.dependencies() + # self.dependencies() ret: Set[Callable[[], None]] = set() affectedSubjPreds = set([(s, p) for s, p, o, c in patch.addQuads] + [(s, p) for s, p, o, c in patch.delQuads]) for (s, p), funcs in self._handlersSp.items():
--- a/rdfdb/currentstategraphapi.py Tue Apr 05 00:53:10 2022 -0700 +++ b/rdfdb/currentstategraphapi.py Tue Apr 05 00:54:53 2022 -0700 @@ -2,21 +2,63 @@ import logging import time import traceback +from typing import Optional, Set, Tuple from rdflib import ConjunctiveGraph, URIRef +from rdflib.term import Node from rdfdb.rdflibpatch import contextsForStatement as rp_contextsForStatement +from rdfdb.readonly_graph import ReadOnlyConjunctiveGraph +from rdfdb.syncedgraph_base import SyncedGraphBase log = logging.getLogger("currentstate") +TripleFilter = Tuple[Optional[Node], Optional[Node], Optional[Node]] -class CurrentStateGraphApi(object): +class Mgr(object): + + def __init__(self, graph, tripleFilter): + self._graph = graph + self._tripleFilter = tripleFilter + + def __enter__(self): + # this should be a readonly view of the existing + # graph, maybe with something to guard against + # writes/patches happening while reads are being + # done. Typical usage will do some reads on this graph + # before moving on to writes. + + if True: + g = ReadOnlyConjunctiveGraph(self._graph) + else: + t1 = time.time() + g = ConjunctiveGraph() + for s, p, o, c in self._graph.quads(self._tripleFilter): + g.store.add((s, p, o), c) + + if self._tripleFilter == (None, None, None): + self.logThisCopy(g, time.time() - t1) + + g.contextsForStatement = lambda stmt: contextsForStatementNoWildcards(g, stmt) + return g + + def logThisCopy(self, g, sec): + log.info("copied graph %s statements (%.1f ms) " "because of this:" % (len(g), sec * 1000)) + for frame in traceback.format_stack(limit=4)[:-2]: + for line in frame.splitlines(): + log.info(" " + line) + + def __exit__(self, type, val, tb): + return + + +class CurrentStateGraphApi(SyncedGraphBase): """ mixin for SyncedGraph, separated here because these methods work together """ - def currentState(self, context=None, tripleFilter=(None, None, None)): + def currentState(self, context: Optional[URIRef] = None, tripleFilter: TripleFilter = (None, None, None)) -> Mgr: """ a graph you can read without being in an addHandler @@ -25,43 +67,11 @@ if context is not None: raise NotImplementedError("currentState with context arg") - class Mgr(object): - - def __enter__(self2): - # this should be a readonly view of the existing - # graph, maybe with something to guard against - # writes/patches happening while reads are being - # done. Typical usage will do some reads on this graph - # before moving on to writes. - - if 1: - g = ReadOnlyConjunctiveGraph(self._graph) - else: - t1 = time.time() - g = ConjunctiveGraph() - for s, p, o, c in self._graph.quads(tripleFilter): - g.store.add((s, p, o), c) + return Mgr(self._graph, tripleFilter) - if tripleFilter == (None, None, None): - self2.logThisCopy(g, time.time() - t1) - - setattr(g, 'contextsForStatement', lambda t: contextsForStatementNoWildcards(g, t)) - return g + _reservedSequentials: Optional[Set[URIRef]] = None - def logThisCopy(self, g, sec): - log.info("copied graph %s statements (%.1f ms) " "because of this:" % (len(g), sec * 1000)) - for frame in traceback.format_stack(limit=4)[:-2]: - for line in frame.splitlines(): - log.info(" " + line) - - def __exit__(self, type, val, tb): - return - - return Mgr() - - _reservedSequentials = None # Optional[Set[URIRef]] - - def sequentialUri(self, prefix): + def sequentialUri(self, prefix: URIRef) -> URIRef: """ Prefix URIRef like http://example.com/r- will return http://example.com/r-1 if that uri is not a subject in the graph, @@ -74,6 +84,7 @@ if newUri not in self._reservedSequentials and not list(self._graph.triples((newUri, None, None))): self._reservedSequentials.add(newUri) return newUri + raise NotImplementedError("never reached") def contextsForStatementNoWildcards(g, triple):
--- a/rdfdb/grapheditapi.py Tue Apr 05 00:53:10 2022 -0700 +++ b/rdfdb/grapheditapi.py Tue Apr 05 00:54:53 2022 -0700 @@ -1,6 +1,8 @@ import logging import random from itertools import chain +from rdfdb.currentstategraphapi import CurrentStateGraphApi +from rdfdb.syncedgraph_base import SyncedGraphBase from rdflib import RDF, URIRef from rdflib.term import Node @@ -10,14 +12,14 @@ log = logging.getLogger('graphedit') -class GraphEditApi(object): +class GraphEditApi(CurrentStateGraphApi, SyncedGraphBase): """ fancier graph edits - + mixin for SyncedGraph, separated here because these methods work together """ - def getObjectPatch(self, context, subject, predicate, newObject): + def getObjectPatch(self, context: URIRef, subject: Node, predicate: URIRef, newObject: Node) -> Patch: """send a patch which removes existing values for (s,p,*,c) and adds (s,p,newObject,c). Values in other graphs are not affected. @@ -100,31 +102,3 @@ for spo in chain(self._graph.triples((None, None, node), context=context), self._graph.triples((node, None, None), context=context)) ]) self.patch(p) - - -import unittest - -from rdflib import ConjunctiveGraph - - -class TestPatchSubgraph(unittest.TestCase): - - def testCollapsesIdenticalQuads(self): - appliedPatches = [] - - class Obj(GraphEditApi): - - def patch(self, p): - appliedPatches.append(p) - - _graph: ConjunctiveGraph - - obj = Obj() - obj._graph = ConjunctiveGraph() - stmt1 = (URIRef('s'), URIRef('p'), URIRef('o'), URIRef('g')) - obj._graph.addN([stmt1]) - obj.patchSubgraph(URIRef('g'), [stmt1]) - self.assertEqual(len(appliedPatches), 1) - p = appliedPatches[0] - self.assertTrue(p.isNoop()) - self.assertEqual(p.jsonRepr, '{"patch": {"adds": "", "deletes": ""}}')
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/grapheditapi_test.py Tue Apr 05 00:54:53 2022 -0700 @@ -0,0 +1,30 @@ +import unittest + +from rdflib import ConjunctiveGraph, URIRef + +from rdfdb.grapheditapi import GraphEditApi + + +class TestPatchSubgraph(unittest.TestCase): + + def testCollapsesIdenticalQuads(self): + appliedPatches = [] + + class Obj(GraphEditApi): + def __init__(self): + pass + + def patch(self, p): + appliedPatches.append(p) + + _graph: ConjunctiveGraph + + obj = Obj() + obj._graph = ConjunctiveGraph() + stmt1 = (URIRef('s'), URIRef('p'), URIRef('o'), URIRef('g')) + obj._graph.addN([stmt1]) + obj.patchSubgraph(URIRef('g'), [stmt1]) + self.assertEqual(len(appliedPatches), 1) + p = appliedPatches[0] + self.assertTrue(p.isNoop()) + self.assertEqual(p.jsonRepr, '{"patch": {"adds": "", "deletes": ""}}')
--- a/rdfdb/graphfile.py Tue Apr 05 00:53:10 2022 -0700 +++ b/rdfdb/graphfile.py Tue Apr 05 00:54:53 2022 -0700 @@ -3,6 +3,7 @@ import time import traceback from typing import Dict, Optional, Protocol, cast +from pathlib import Path import twisted.internet.reactor from rdflib import Graph, URIRef @@ -39,7 +40,7 @@ one rdf file that we read from, write to, and notice external changes to """ - def __init__(self, notifier: INotify, path: str, uri: URIRef, patch: PatchCb, getSubgraph: GetSubgraph, globalPrefixes: Dict[str, URIRef], + def __init__(self, notifier: INotify, path: Path, uri: URIRef, patch: PatchCb, getSubgraph: GetSubgraph, globalPrefixes: Dict[str, URIRef], ctxPrefixes: Dict[str, URIRef]): """ uri is the context for the triples in this file. We assume @@ -205,7 +206,7 @@ def flush(self) -> None: self.writeCall = None - tmpOut = self.path + ".rdfdb-temp" + tmpOut = str(self.path) + ".rdfdb-temp" f = open(tmpOut, 'wb') t1 = time.time() for p, n in (list(self.globalPrefixes.items()) + list(self.readPrefixes.items()) + list(self.ctxPrefixes.items())):
--- a/rdfdb/graphfile_test.py Tue Apr 05 00:53:10 2022 -0700 +++ b/rdfdb/graphfile_test.py Tue Apr 05 00:54:53 2022 -0700 @@ -1,13 +1,15 @@ import tempfile import unittest +from pathlib import Path from typing import cast import mock +from _pytest.assertion import truncate from rdflib import Graph, URIRef from twisted.internet.inotify import INotify from rdfdb.graphfile import GraphFile -from _pytest.assertion import truncate + truncate.DEFAULT_MAX_LINES = 9999 truncate.DEFAULT_MAX_CHARS = 9999 @@ -26,7 +28,7 @@ def getSubgraph(uri): return Graph() - gf = GraphFile(cast(INotify, mock.Mock()), tf.name, URIRef('uri'), mock.Mock(), getSubgraph, {}, {}) + gf = GraphFile(cast(INotify, mock.Mock()), Path(tf.name), URIRef('uri'), mock.Mock(), getSubgraph, {}, {}) gf.reread() newGraph = Graph() @@ -35,8 +37,7 @@ gf.flush() wroteContent = open(tf.name, 'rb').read() print(wroteContent) - self.assertEqual( - b'''@prefix : <http://example.com/> . + self.assertEqual(b'''@prefix : <http://example.com/> . @prefix n: <http://example.com/n/> . :boo n:two <http://example.com/other/ns> .
--- a/rdfdb/service.py Tue Apr 05 00:53:10 2022 -0700 +++ b/rdfdb/service.py Tue Apr 05 00:54:53 2022 -0700 @@ -4,22 +4,24 @@ import optparse import os import sys +from pathlib import Path from typing import Dict, List, Optional, cast import cyclone.web import cyclone.websocket import twisted.internet.error +import twisted.internet.reactor from rdflib import ConjunctiveGraph, Graph, URIRef -import twisted.internet.reactor from twisted.internet.inotify import IN_CREATE, INotify +from twisted.internet.interfaces import IReactorCore from twisted.python.failure import Failure from twisted.python.filepath import FilePath -from twisted.internet.interfaces import IReactorCore from rdfdb.file_vs_uri import (DirUriMap, correctToTopdirPrefix, fileForUri, uriFromFile) from rdfdb.graphfile import GetSubgraph, GraphFile, PatchCb from rdfdb.patch import ALLSTMTS, Patch from rdfdb.rdflibpatch import patchQuads + reactor = cast(IReactorCore, twisted.internet.reactor) # gatherProcessStats() @@ -46,6 +48,9 @@ pass +CtxPrefixes = Dict[Optional[URIRef], Dict[str, URIRef]] + + class WatchedFiles(object): """ find files, notice new files. @@ -53,7 +58,7 @@ This object watches directories. Each GraphFile watches its own file. """ - def __init__(self, dirUriMap: DirUriMap, patch: PatchCb, getSubgraph: GetSubgraph, addlPrefixes: Dict[str, URIRef]): + def __init__(self, dirUriMap: DirUriMap, patch: PatchCb, getSubgraph: GetSubgraph, addlPrefixes: CtxPrefixes): self.dirUriMap = dirUriMap # {abspath : uri prefix} self.patch, self.getSubgraph = patch, getSubgraph self.addlPrefixes = addlPrefixes @@ -70,10 +75,10 @@ try: for topdir in self.dirUriMap: for dirpath, dirnames, filenames in os.walk(topdir): + dirpath = Path(dirpath) for base in filenames: - p = os.path.join(dirpath, base) + p = dirpath / base # why wasn't mypy catching this? - assert isinstance(p, bytes) self.watchFile(p) self.notifier.watch(FilePath(dirpath), autoAdd=True, callbacks=[self.dirChange]) finally: @@ -81,13 +86,13 @@ def dirChange(self, watch, path: FilePath, mask): if mask & IN_CREATE: - if cast(str, path.path).endswith((b'~', b'.swp', b'swx', b'.rdfdb-temp')): + if cast(str, path.path).endswith(('~', '.swp', 'swx', '.rdfdb-temp')): return log.debug("%s created; consider adding a watch", path) - self.watchFile(path.path) + self.watchFile(Path(cast(str, path.path))) - def watchFile(self, inFile: bytes): + def watchFile(self, inFile: Path): """ consider adding a GraphFile to self.graphFiles @@ -99,10 +104,10 @@ return inFile = correctToTopdirPrefix(self.dirUriMap, inFile) - if os.path.splitext(inFile)[1] not in [b'.n3']: + if os.path.splitext(inFile)[1] not in ['.n3']: return - if b'/capture/' in inFile: + if '/capture/' in str(inFile): # smaller graph for now return @@ -111,13 +116,13 @@ # SyncedGraph calls graphFromNQuad on the incoming data and # has a parse error. I'm not sure where this should be fixed # yet. - if b'-rules' in inFile: + if '-rules' in str(inFile): return # for legacy versions, compile all the config stuff you want # read into one file called config.n3. New versions won't read # it. - if inFile.endswith(b"config.n3"): + if inFile.name == "config.n3": return ctx = uriFromFile(self.dirUriMap, inFile) @@ -138,19 +143,16 @@ """ if ctx not in self.graphFiles: outFile = fileForUri(self.dirUriMap, ctx) - # mypy missed the next line because of - # https://github.com/python/typeshed/issues/2937 ('str in - # bytes' isn't an error) - assert b'//' not in outFile, (outFile, self.dirUriMap, ctx) + assert '//' not in str(outFile), (outFile, self.dirUriMap, ctx) log.info("starting new file %r", outFile) self._addGraphFile(ctx, outFile) - def _addGraphFile(self, ctx, path): + def _addGraphFile(self, ctx: URIRef, path: Path): self.addlPrefixes.setdefault(ctx, {}) self.addlPrefixes.setdefault(None, {}) gf = GraphFile(self.notifier, path, ctx, self.patch, self.getSubgraph, globalPrefixes=self.addlPrefixes[None], ctxPrefixes=self.addlPrefixes[ctx]) self.graphFiles[ctx] = gf - fileStats.mappedGraphFiles = len(self.graphFiles) + # fileStats.mappedGraphFiles = len(self.graphFiles) return gf def dirtyFiles(self, ctxs): @@ -221,10 +223,14 @@ def __init__(self, dirUriMap: DirUriMap, addlPrefixes): self.clients: List[WebsocketClient] = [] self.graph = ConjunctiveGraph() + # stats.graphLen = len(self.graph) # stats.clients = len(self.clients) - self.watchedFiles = WatchedFiles(dirUriMap, self.patch, self.getSubgraph, addlPrefixes) + def callPatch(patch: Patch, dueToFileChange: bool = False): + self.patch(patch, dueToFileChange=dueToFileChange) + + self.watchedFiles = WatchedFiles(dirUriMap, callPatch, self.getSubgraph, addlPrefixes) self.summarizeToLog() @@ -245,12 +251,12 @@ # an error here needs to drop the sender, and reset everyone # else if we can't rollback the failing patch. patchQuads(self.graph, patch.delQuads, patch.addQuads, perfect=True) - stats.graphLen = len(self.graph) + # stats.graphLen = len(self.graph) self._syncPatchToOtherClients(patch, sender) if not dueToFileChange: self.watchedFiles.dirtyFiles([ctx]) - graphStats.statements = len(self.graph) + # graphStats.statements = len(self.graph) def _syncPatchToOtherClients(self, p: Patch, sender: Optional[str] = None): for c in self.clients: @@ -266,12 +272,15 @@ log.info("%r %r - dropping client", c, err.getErrorMessage()) if c in self.clients: self.clients.remove(c) - stats.clients = len(self.clients) + # stats.clients = len(self.clients) def summarizeToLog(self): log.info("contexts in graph (%s total stmts):" % len(self.graph)) for c in self.graph.contexts(): - log.info(" %s: %s statements" % (c.identifier, len(self.getSubgraph(c.identifier)))) + ci = cast(URIRef, c.identifier) + g = self.getSubgraph(ci) + n = g.__len__() + log.info(" %s: %s statements" % (c.identifier, n)) def getSubgraph(self, uri: URIRef) -> Graph: """ @@ -282,7 +291,7 @@ and it's returning triples, but I think quads would be better """ # this is returning an empty Graph :( - #return self.graph.get_context(uri) + # return self.graph.get_context(uri) g = Graph() for s in self.graph.triples(ALLSTMTS, uri): @@ -293,7 +302,7 @@ log.info("new connection: sending all graphs to %r" % newClient) newClient.sendPatch(Patch(addQuads=self.graph.quads(ALLSTMTS), delQuads=[])) self.clients.append(newClient) - stats.clients = len(self.clients) + # stats.clients = len(self.clients) class GraphResource(cyclone.web.RequestHandler): @@ -335,7 +344,7 @@ def main(dirUriMap: Optional[DirUriMap] = None, prefixes: Optional[Dict[str, URIRef]] = None, port=9999): if dirUriMap is None: - dirUriMap = {b'data/': URIRef('http://example.com/data/')} + dirUriMap = {Path('data/'): URIRef('http://example.com/data/')} if prefixes is None: prefixes = { 'rdf': URIRef('http://www.w3.org/1999/02/22-rdf-syntax-ns#'), @@ -379,4 +388,4 @@ if __name__ == '__main__': - main() \ No newline at end of file + main()
--- a/rdfdb/syncedgraph.py Tue Apr 05 00:53:10 2022 -0700 +++ b/rdfdb/syncedgraph.py Tue Apr 05 00:54:53 2022 -0700 @@ -14,228 +14,27 @@ WsClientProtocol one connection with the rdfdb server. """ -import json -import logging -import traceback -import urllib.parse -from typing import Any, Optional, cast - -import autobahn.twisted.websocket -import treq -import twisted.internet.reactor -from rdflib import ConjunctiveGraph, URIRef -from twisted.internet import defer -from twisted.internet.interfaces import IReactorCore - +from typing import Optional from rdfdb.autodepgraphapi import AutoDepGraphApi -from rdfdb.currentstategraphapi import CurrentStateGraphApi from rdfdb.grapheditapi import GraphEditApi -from rdfdb.patch import Patch -from rdfdb.rdflibpatch import patchQuads # everybody who writes literals needs to get this from rdfdb.rdflibpatch_literal import patch - +from rdfdb.syncedgraph_base import SyncedGraphBase +from rdflib import URIRef patch() -reactor = cast(IReactorCore, twisted.internet.reactor) - -log = logging.getLogger('syncedgraph') - - -class WsClientProtocol(autobahn.twisted.websocket.WebSocketClientProtocol): - """The server for this is service.WebsocketClient""" - - def __init__(self, sg): - super().__init__() - self.sg = sg - self.sg.currentClient = self - self.connectionId = None - - def onConnect(self, response): - log.info('conn %r', response) - - def onOpen(self): - log.info('ws open') - self.sg.isConnected = True - - def onMessage(self, payload, isBinary): - msg = json.loads(payload) - if 'connectedAs' in msg: - self.connectionId = msg['connectedAs'] - log.info(f'rdfdb calls us {self.connectionId}') - elif 'patch' in msg: - p = Patch(jsonRepr=payload.decode('utf8')) - log.debug("received patch %s", p.shortSummary()) - self.sg.onPatchFromDb(p) - else: - log.warn('unknown msg from websocket: %s...', payload[:32]) - - def sendPatch(self, p: Patch): - # this is where we could concatenate little patches into a - # bigger one. Often, many statements will cancel each - # other out. - - # also who's going to accumulate patches when server is down, - # or is that not allowed? - if self.connectionId is None: - raise ValueError("can't send patches before we get an id") - body = p.makeJsonRepr() - log.debug(f'connectionId={self.connectionId} sending patch {len(body)} bytes') - self.sendMessage(body.encode('utf8')) - - def onClose(self, wasClean, code, reason): - log.info("WebSocket connection closed: {0}".format(reason)) - self.sg.lostRdfdbConnection() -class SyncedGraph(CurrentStateGraphApi, AutoDepGraphApi, GraphEditApi): - """ - graph for clients to use. Changes are synced with the master graph - in the rdfdb process. - - self.patch(p: Patch) is the only way to write to the graph. - - Reading can be done with the AutoDepGraphApi methods which set up - watchers to call you back when the results of the read have - changed (like knockoutjs). Or you can read with - CurrentStateGraphApi which doesn't have watchers, but you have to - opt into using it so it's clear you aren't in an auto-dep context - and meant to set up watchers. - - You may want to attach to self.initiallySynced deferred so you - don't attempt patches before we've heard the initial contents of - the graph. It would be ok to accumulate some patches of new - material, but usually you won't correctly remove the existing - statements unless we have the correct graph. - - If we get out of sync, we abandon our local graph (even any - pending local changes) and get the data again from the server. - """ - +class SyncedGraph(AutoDepGraphApi, GraphEditApi): + ''' + SyncedGraphBase + | | + CurState AutoDep + | | + GraphEdit | + | | + SyncedGraph + ''' def __init__(self, rdfdbRoot: URIRef, label: str, receiverHost: Optional[str] = None): - """ - label is a string that the server will display in association - with your connection - - receiverHost is the hostname other nodes can use to talk to me - """ - self.isConnected = False - self.currentClient: Optional[WsClientProtocol] = None - self.rdfdbRoot = rdfdbRoot - self.connectSocket() - self.initiallySynced: defer.Deferred[None] = defer.Deferred() - self._graph = ConjunctiveGraph() - + SyncedGraphBase.__init__(self, rdfdbRoot, label, receiverHost) AutoDepGraphApi.__init__(self) - # this needs more state to track if we're doing a resync (and - # everything has to error or wait) or if we're live - - def lostRdfdbConnection(self) -> None: - self.isConnected = False - self.patch(Patch(delQuads=self._graph.quads())) - log.info(f'cleared graph to {len(self._graph)}') - log.error('graph is not updating- you need to restart') - self.connectSocket() - - def connectSocket(self) -> None: - factory = autobahn.twisted.websocket.WebSocketClientFactory( - self.rdfdbRoot.replace('http://', 'ws://') + 'syncedGraph', - # Don't know if this is required by spec, but - # cyclone.websocket breaks with no origin header. - origin='foo') - factory.protocol = cast(Any, lambda: WsClientProtocol(self)) - - rr = urllib.parse.urlparse(self.rdfdbRoot) - reactor.connectTCP(rr.hostname, rr.port, factory) - # WsClientProtocol sets our currentClient. Needs rewrite using agents. - - def resync(self): - """ - get the whole graph again from the server (e.g. we had a - conflict while applying a patch and want to return to the - truth). - - To avoid too much churn, we remember our old graph and diff it - against the replacement. This way, our callers only see the - corrections. - Edits you make during a resync will surely be lost, so I - should just fail them. There should be a notification back to - UIs who want to show that we're doing a resync. - """ - log.info('resync') - if self.currentClient: - self.currentClient.dropConnection() - - def _resyncGraph(self, response): - log.warn("new graph in") - - if self.currentClient: - self.currentClient.dropConnection() - # diff against old entire graph - # broadcast that change - - def patch(self, p: Patch) -> None: - """send this patch to the server and apply it to our local - graph and run handlers""" - - if not self.isConnected or self.currentClient is None: - log.warn("not currently connected- dropping patch") - return - - if p.isNoop(): - log.info("skipping no-op patch") - return - - # these could fail if we're out of sync. One approach: - # Rerequest the full state from the server, try the patch - # again after that, then give up. - debugKey = '[id=%s]' % (id(p) % 1000) - log.debug("\napply local patch %s %s", debugKey, p) - try: - self._applyPatchLocally(p) - except ValueError as e: - log.error(e) - self.resync() - return - log.debug('runDepsOnNewPatch') - self.runDepsOnNewPatch(p) - log.debug('sendPatch') - self.currentClient.sendPatch(p) - log.debug('patch is done %s', debugKey) - - def suggestPrefixes(self, ctx, prefixes): - """ - when writing files for this ctx, try to use these n3 - prefixes. async, not guaranteed to finish before any - particular file flush - """ - treq.post(self.rdfdbRoot + 'prefixes', json.dumps({'ctx': ctx, 'prefixes': prefixes}).encode('utf8')) - - def _applyPatchLocally(self, p: Patch): - # .. and disconnect on failure - patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True) - log.debug("graph now has %s statements" % len(self._graph)) - - def onPatchFromDb(self, p): - """ - central server has sent us a patch - """ - if log.isEnabledFor(logging.DEBUG): - if len(p.addQuads) > 50: - log.debug('server has sent us %s', p.shortSummary()) - else: - log.debug('server has sent us %s', p) - - self._applyPatchLocally(p) - try: - self.runDepsOnNewPatch(p) - except Exception: - # don't reflect this error back to the server; we did - # receive its patch correctly. However, we're in a bad - # state since some dependencies may not have rerun - traceback.print_exc() - log.warn("some graph dependencies may not have completely run") - - if self.initiallySynced: - self.initiallySynced.callback(None) - self.initiallySynced = None
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/syncedgraph_base.py Tue Apr 05 00:54:53 2022 -0700 @@ -0,0 +1,226 @@ +import json +import logging +import traceback +import urllib.parse +from typing import Any, Optional, cast + +import autobahn.twisted.websocket +import treq +import twisted.internet.reactor +from rdflib import ConjunctiveGraph, URIRef +from twisted.internet import defer +from twisted.internet.interfaces import IReactorCore + +from rdfdb.patch import Patch +from rdfdb.rdflibpatch import patchQuads + + +class WsClientProtocol(autobahn.twisted.websocket.WebSocketClientProtocol): + """The server for this is service.WebsocketClient""" + + def __init__(self, sg): + super().__init__() + self.sg = sg + self.sg.currentClient = self + self.connectionId = None + + def onConnect(self, response): + log.info('conn %r', response) + + def onOpen(self): + log.info('ws open') + self.sg.isConnected = True + + def onMessage(self, payload, isBinary): + msg = json.loads(payload) + if 'connectedAs' in msg: + self.connectionId = msg['connectedAs'] + log.info(f'rdfdb calls us {self.connectionId}') + elif 'patch' in msg: + p = Patch(jsonRepr=payload.decode('utf8')) + log.debug("received patch %s", p.shortSummary()) + self.sg.onPatchFromDb(p) + else: + log.warn('unknown msg from websocket: %s...', payload[:32]) + + def sendPatch(self, p: Patch): + # this is where we could concatenate little patches into a + # bigger one. Often, many statements will cancel each + # other out. + + # also who's going to accumulate patches when server is down, + # or is that not allowed? + if self.connectionId is None: + raise ValueError("can't send patches before we get an id") + body = p.makeJsonRepr() + log.debug(f'connectionId={self.connectionId} sending patch {len(body)} bytes') + self.sendMessage(body.encode('utf8')) + + def onClose(self, wasClean, code, reason): + log.info("WebSocket connection closed: {0}".format(reason)) + self.sg.lostRdfdbConnection() + + +reactor = cast(IReactorCore, twisted.internet.reactor) + +log = logging.getLogger('syncedgraph') + + +class SyncedGraphBase(object): + """ + graph for clients to use. Changes are synced with the master graph + in the rdfdb process. + + self.patch(p: Patch) is the only way to write to the graph. + + Reading can be done with the AutoDepGraphApi methods which set up + watchers to call you back when the results of the read have + changed (like knockoutjs). Or you can read with + CurrentStateGraphApi which doesn't have watchers, but you have to + opt into using it so it's clear you aren't in an auto-dep context + and meant to set up watchers. + + You may want to attach to self.initiallySynced deferred so you + don't attempt patches before we've heard the initial contents of + the graph. It would be ok to accumulate some patches of new + material, but usually you won't correctly remove the existing + statements unless we have the correct graph. + + If we get out of sync, we abandon our local graph (even any + pending local changes) and get the data again from the server. + """ + + def __init__(self, rdfdbRoot: URIRef, label: str, receiverHost: Optional[str] = None): + """ + label is a string that the server will display in association + with your connection + + receiverHost is the hostname other nodes can use to talk to me + """ + self.isConnected = False + self.currentClient: Optional[WsClientProtocol] = None + self.rdfdbRoot = rdfdbRoot + self.connectSocket() + self.initiallySynced: defer.Deferred[None] = defer.Deferred() + self._graph = ConjunctiveGraph() + + # todo: + # AutoDepGraphApi.__init__(self) + + # this needs more state to track if we're doing a resync (and + # everything has to error or wait) or if we're live + + def lostRdfdbConnection(self) -> None: + self.isConnected = False + self.patch(Patch(delQuads=self._graph.quads())) + log.info(f'cleared graph to {len(self._graph)}') + log.error('graph is not updating- you need to restart') + self.connectSocket() + + def connectSocket(self) -> None: + factory = autobahn.twisted.websocket.WebSocketClientFactory( + self.rdfdbRoot.replace('http://', 'ws://') + 'syncedGraph', + # Don't know if this is required by spec, but + # cyclone.websocket breaks with no origin header. + origin='foo') + factory.protocol = cast(Any, lambda: WsClientProtocol(self)) + + rr = urllib.parse.urlparse(self.rdfdbRoot) + reactor.connectTCP(rr.hostname, rr.port, factory) + # WsClientProtocol sets our currentClient. Needs rewrite using agents. + + def resync(self): + """ + get the whole graph again from the server (e.g. we had a + conflict while applying a patch and want to return to the + truth). + + To avoid too much churn, we remember our old graph and diff it + against the replacement. This way, our callers only see the + corrections. + + Edits you make during a resync will surely be lost, so I + should just fail them. There should be a notification back to + UIs who want to show that we're doing a resync. + """ + log.info('resync') + if self.currentClient: + self.currentClient.dropConnection() + + def _resyncGraph(self, response): + log.warn("new graph in") + + if self.currentClient: + self.currentClient.dropConnection() + # diff against old entire graph + # broadcast that change + + def runDepsOnNewPatch(self, p): + # See AutoDepGraphApi + pass + + def patch(self, p: Patch) -> None: + """send this patch to the server and apply it to our local + graph and run handlers""" + + if not self.isConnected or self.currentClient is None: + log.warn("not currently connected- dropping patch") + return + + if p.isNoop(): + log.info("skipping no-op patch") + return + + # these could fail if we're out of sync. One approach: + # Rerequest the full state from the server, try the patch + # again after that, then give up. + debugKey = '[id=%s]' % (id(p) % 1000) + log.debug("\napply local patch %s %s", debugKey, p) + try: + self._applyPatchLocally(p) + except ValueError as e: + log.error(e) + self.resync() + return + log.debug('runDepsOnNewPatch') + self.runDepsOnNewPatch(p) + log.debug('sendPatch') + self.currentClient.sendPatch(p) + log.debug('patch is done %s', debugKey) + + def suggestPrefixes(self, ctx, prefixes): + """ + when writing files for this ctx, try to use these n3 + prefixes. async, not guaranteed to finish before any + particular file flush + """ + treq.post(self.rdfdbRoot + 'prefixes', json.dumps({'ctx': ctx, 'prefixes': prefixes}).encode('utf8')) + + def _applyPatchLocally(self, p: Patch): + # .. and disconnect on failure + patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True) + log.debug("graph now has %s statements" % len(self._graph)) + + def onPatchFromDb(self, p): + """ + central server has sent us a patch + """ + if log.isEnabledFor(logging.DEBUG): + if len(p.addQuads) > 50: + log.debug('server has sent us %s', p.shortSummary()) + else: + log.debug('server has sent us %s', p) + + self._applyPatchLocally(p) + try: + self.runDepsOnNewPatch(p) + except Exception: + # don't reflect this error back to the server; we did + # receive its patch correctly. However, we're in a bad + # state since some dependencies may not have rerun + traceback.print_exc() + log.warn("some graph dependencies may not have completely run") + + if self.initiallySynced: + self.initiallySynced.callback(None) + self.initiallySynced = None