Mercurial > code > home > repos > rdfdb
changeset 45:dc61012eeace
python reformat
Ignore-this: 93c7e122cd60efa7ff4e8620d9bc10d3
author | Drew Perttula <drewp@bigasterisk.com> |
---|---|
date | Mon, 27 May 2019 07:45:24 +0000 |
parents | 2f188734adef |
children | 3b36b2c8ae65 |
files | rdfdb/autodepgraphapi.py rdfdb/currentstategraphapi.py rdfdb/currentstategraphapi_test.py rdfdb/file_vs_uri.py rdfdb/grapheditapi.py rdfdb/graphfile.py rdfdb/graphfile_test.py rdfdb/localsyncedgraph.py rdfdb/mock_syncedgraph.py rdfdb/patch.py rdfdb/patchreceiver.py rdfdb/patchsender.py rdfdb/rdflibpatch.py rdfdb/rdflibpatch_literal.py rdfdb/service.py rdfdb/syncedgraph.py |
diffstat | 16 files changed, 440 insertions(+), 258 deletions(-) [+] |
line wrap: on
line diff
--- a/rdfdb/autodepgraphapi.py Mon May 27 07:01:56 2019 +0000 +++ b/rdfdb/autodepgraphapi.py Mon May 27 07:45:24 2019 +0000 @@ -4,6 +4,7 @@ from rdfdb.currentstategraphapi import contextsForStatementNoWildcards log = logging.getLogger('autodepgraphapi') + class AutoDepGraphApi(object): """ knockoutjs-inspired API for automatically building a dependency @@ -21,8 +22,9 @@ def __init__(self): self._watchers = _GraphWatchers() - self.currentFuncs: List[Callable[[], None]] = [] # stack of addHandler callers - + self.currentFuncs: List[Callable[[], None]] = [ + ] # stack of addHandler callers + def addHandler(self, func: Callable[[], None]) -> None: """ run this (idempotent) func, noting what graph values it @@ -54,7 +56,8 @@ raise finally: self.currentFuncs.pop() - log.debug('graph.currentFuncs pop %s. stack now has %s', func, len(self.currentFuncs)) + log.debug('graph.currentFuncs pop %s. stack now has %s', func, + len(self.currentFuncs)) def runDepsOnNewPatch(self, p): """ @@ -86,14 +89,21 @@ # between calls, but probably most of them don't do that (they # work from a starting uri) - def value(self, subject=None, predicate=RDF.value, object=None, - default=None, any=True): + def value(self, + subject=None, + predicate=RDF.value, + object=None, + default=None, + any=True): if object is not None: raise NotImplementedError() func = self._getCurrentFunc() self._watchers.addSubjPredWatcher(func, subject, predicate) - return self._graph.value(subject, predicate, object=object, - default=default, any=any) + return self._graph.value(subject, + predicate, + object=object, + default=default, + any=any) def objects(self, subject=None, predicate=None): func = self._getCurrentFunc() @@ -112,7 +122,7 @@ func = self._getCurrentFunc() self._watchers.addSubjectWatcher(func, subject) return self._graph.predicate_objects(subject) - + def items(self, listUri): """generator. Having a chain of watchers on the results is not well-tested yet""" @@ -126,7 +136,6 @@ raise ValueError("List contains a recursive rdf:rest reference") chain.add(listUri) - def contains(self, triple): func = self._getCurrentFunc() self._watchers.addTripleWatcher(func, triple) @@ -144,17 +153,23 @@ # I'm going to be repeating that logic a lot. Maybe just for the # subjects(RDF.type, t) call + HandlerSet = Set[Callable[[], None]] + class _GraphWatchers(object): """ store the current handlers that care about graph changes """ + def __init__(self): - self._handlersSp: Dict[Tuple[URIRef, URIRef], HandlerSet] = {} # (s,p): set(handlers) - self._handlersPo: Dict[Tuple[URIRef, URIRef], HandlerSet] = {} # (p,o): set(handlers) - self._handlersSpo: Dict[Tuple[URIRef, URIRef, URIRef], HandlerSet] = {} # (s,p,o): set(handlers) - self._handlersS: Dict[URIRef, HandlerSet] = {} # s: set(handlers) + self._handlersSp: Dict[Tuple[URIRef, URIRef], HandlerSet] = { + } # (s,p): set(handlers) + self._handlersPo: Dict[Tuple[URIRef, URIRef], HandlerSet] = { + } # (p,o): set(handlers) + self._handlersSpo: Dict[Tuple[URIRef, URIRef, URIRef], HandlerSet] = { + } # (s,p,o): set(handlers) + self._handlersS: Dict[URIRef, HandlerSet] = {} # s: set(handlers) def addSubjPredWatcher(self, func, s, p): if func is None: @@ -174,7 +189,7 @@ def addSubjectWatcher(self, func, s): self._handlersS.setdefault(s, set()).add(func) - + def whoCares(self, patch): """what handler functions would care about the changes in this patch? @@ -182,34 +197,34 @@ """ #self.dependencies() ret: Set[Callable[[], None]] = set() - affectedSubjPreds = set([(s, p) for s, p, o, c in patch.addQuads]+ + affectedSubjPreds = set([(s, p) for s, p, o, c in patch.addQuads] + [(s, p) for s, p, o, c in patch.delQuads]) for (s, p), funcs in self._handlersSp.items(): if (s, p) in affectedSubjPreds: ret.update(funcs) funcs.clear() - affectedPredObjs = set([(p, o) for s, p, o, c in patch.addQuads]+ - [(p, o) for s, p, o, c in patch.delQuads]) + affectedPredObjs = set([(p, o) for s, p, o, c in patch.addQuads] + + [(p, o) for s, p, o, c in patch.delQuads]) for (p, o), funcs in self._handlersPo.items(): if (p, o) in affectedPredObjs: ret.update(funcs) funcs.clear() - affectedTriples = set([(s, p, o) for s, p, o, c in patch.addQuads]+ + affectedTriples = set([(s, p, o) for s, p, o, c in patch.addQuads] + [(s, p, o) for s, p, o, c in patch.delQuads]) for triple, funcs in self._handlersSpo.items(): if triple in affectedTriples: ret.update(funcs) funcs.clear() - - affectedSubjs = set([s for s, p, o, c in patch.addQuads]+ + + affectedSubjs = set([s for s, p, o, c in patch.addQuads] + [s for s, p, o, c in patch.delQuads]) for subj, funcs in self._handlersS.items(): if subj in affectedSubjs: ret.update(funcs) funcs.clear() - + return ret def dependencies(self):
--- a/rdfdb/currentstategraphapi.py Mon May 27 07:01:56 2019 +0000 +++ b/rdfdb/currentstategraphapi.py Mon May 27 07:45:24 2019 +0000 @@ -3,14 +3,17 @@ from rdfdb.rdflibpatch import contextsForStatement as rp_contextsForStatement log = logging.getLogger("currentstate") + class ReadOnlyConjunctiveGraph(object): """similar to rdflib's ReadOnlyGraphAggregate but takes one CJ in, instead of a bunch of Graphs""" + def __init__(self, graph): self.graph = graph def __getattr__(self, attr): - if attr in ['subjects', 'value', 'objects', 'triples', 'label']: # not complete + if attr in ['subjects', 'value', 'objects', 'triples', + 'label']: # not complete return getattr(self.graph, attr) raise TypeError("can't access %r of read-only graph" % attr) @@ -36,6 +39,7 @@ raise NotImplementedError("currentState with context arg") class Mgr(object): + def __enter__(self2): # this should be a readonly view of the existing # graph, maybe with something to guard against @@ -48,14 +52,14 @@ else: t1 = time.time() g = ConjunctiveGraph() - for s,p,o,c in self._graph.quads(tripleFilter): - g.store.add((s,p,o), c) + for s, p, o, c in self._graph.quads(tripleFilter): + g.store.add((s, p, o), c) if tripleFilter == (None, None, None): self2.logThisCopy(g, time.time() - t1) - - setattr(g, 'contextsForStatement', - lambda t: contextsForStatementNoWildcards(g, t)) + + setattr(g, 'contextsForStatement', lambda t: + contextsForStatementNoWildcards(g, t)) return g def logThisCopy(self, g, sec): @@ -63,15 +67,15 @@ "because of this:" % (len(g), sec * 1000)) for frame in traceback.format_stack(limit=4)[:-2]: for line in frame.splitlines(): - log.info(" "+line) + log.info(" " + line) def __exit__(self, type, val, tb): return return Mgr() - _reservedSequentials = None # Optional[Set[URIRef]] - + _reservedSequentials = None # Optional[Set[URIRef]] + def sequentialUri(self, prefix): """ Prefix URIRef like http://example.com/r- will return @@ -82,11 +86,12 @@ self._reservedSequentials = set() for i in itertools.count(1): newUri = URIRef(prefix + str(i)) - if newUri not in self._reservedSequentials and not list(self._graph.triples((newUri, None, None))): + if newUri not in self._reservedSequentials and not list( + self._graph.triples((newUri, None, None))): self._reservedSequentials.add(newUri) return newUri - + def contextsForStatementNoWildcards(g, triple): if None in triple: raise NotImplementedError("no wildcards")
--- a/rdfdb/currentstategraphapi_test.py Mon May 27 07:01:56 2019 +0000 +++ b/rdfdb/currentstategraphapi_test.py Mon May 27 07:45:24 2019 +0000 @@ -2,8 +2,12 @@ from rdflib import URIRef from rdfdb.syncedgraph import SyncedGraph + class TestSequentialUri(unittest.TestCase): + def test_returnsSequentialUris(self): g = SyncedGraph(URIRef('http://example.com/db/'), label='test') - self.assertEqual(g.sequentialUri(URIRef('http://example.com/foo')), URIRef('http://example.com/foo1')) - self.assertEqual(g.sequentialUri(URIRef('http://example.com/foo')), URIRef('http://example.com/foo2')) + self.assertEqual(g.sequentialUri(URIRef('http://example.com/foo')), + URIRef('http://example.com/foo1')) + self.assertEqual(g.sequentialUri(URIRef('http://example.com/foo')), + URIRef('http://example.com/foo2'))
--- a/rdfdb/file_vs_uri.py Mon May 27 07:01:56 2019 +0000 +++ b/rdfdb/file_vs_uri.py Mon May 27 07:45:24 2019 +0000 @@ -8,14 +8,17 @@ DirUriMap = Dict[bytes, URIRef] + def uriFromFile(dirUriMap: DirUriMap, filename: bytes) -> URIRef: assert filename.endswith(b'.n3'), filename for d, prefix in list(dirUriMap.items()): if filename.startswith(d): - return URIRef(prefix + filename[len(d):-len(b'.n3')].decode('ascii')) + return URIRef(prefix + + filename[len(d):-len(b'.n3')].decode('ascii')) raise ValueError("filename %s doesn't start with any of %s" % (filename, list(dirUriMap.keys()))) + def fileForUri(dirUriMap: DirUriMap, ctx: URIRef) -> bytes: assert isinstance(ctx, URIRef), ctx for d, prefix in dirUriMap.items(): @@ -23,6 +26,7 @@ return d + ctx[len(prefix):].encode('utf8') + b'.n3' raise ValueError("don't know what filename to use for %s" % ctx) + def correctToTopdirPrefix(dirUriMap: DirUriMap, inFile: bytes) -> bytes: if not any(inFile.startswith(prefix) for prefix in dirUriMap): for prefix in dirUriMap:
--- a/rdfdb/grapheditapi.py Mon May 27 07:01:56 2019 +0000 +++ b/rdfdb/grapheditapi.py Mon May 27 07:45:24 2019 +0000 @@ -5,6 +5,7 @@ from rdfdb.patch import Patch, quadsWithContextUris log = logging.getLogger('graphedit') + class GraphEditApi(object): """ fancier graph edits @@ -20,30 +21,35 @@ """ existing = [] - for spoc in quadsWithContextUris(self._graph.quads((subject, predicate, None, context))): + for spoc in quadsWithContextUris( + self._graph.quads((subject, predicate, None, context))): existing.append(spoc) - toAdd = ([(subject, predicate, newObject, context)] - if newObject is not None else []) + toAdd = ([(subject, predicate, newObject, + context)] if newObject is not None else []) return Patch(delQuads=existing, addQuads=toAdd).simplify() - def patchObject(self, context: URIRef, subject: Node, predicate: URIRef, newObject: Node): + def patchObject(self, context: URIRef, subject: Node, predicate: URIRef, + newObject: Node): p = self.getObjectPatch(context, subject, predicate, newObject) if not p.isNoop(): log.debug("patchObject %r" % p.jsonRepr) - self.patch(p) # type: ignore + self.patch(p) # type: ignore def patchSubgraph(self, context, newGraph): """ replace all statements in 'context' with the quads in newGraph. This is not cooperating with currentState. """ - old = set(quadsWithContextUris(self._graph.quads((None, None, None, context)))) + old = set( + quadsWithContextUris(self._graph.quads( + (None, None, None, context)))) new = set(quadsWithContextUris(newGraph)) p = Patch(delQuads=old - new, addQuads=new - old) self.patch(p) - return p # for debugging - - def patchMapping(self, context, subject, predicate, nodeClass, keyPred, valuePred, newKey, newValue): + return p # for debugging + + def patchMapping(self, context, subject, predicate, nodeClass, keyPred, + valuePred, newKey, newValue): """ creates/updates a structure like this: @@ -59,25 +65,28 @@ # as long as currentState is expensive and has the # tripleFilter optimization, this looks like a mess. If # currentState became cheap, a lot of code here could go away. - - with self.currentState(tripleFilter=(subject, predicate, None)) as current: + + with self.currentState(tripleFilter=(subject, predicate, + None)) as current: adds = set([]) for setting in current.objects(subject, predicate): - with self.currentState(tripleFilter=(setting, keyPred, None)) as current2: - + with self.currentState(tripleFilter=(setting, keyPred, + None)) as current2: + match = current2.value(setting, keyPred) == newKey if match: break else: - setting = URIRef(subject + "/map/%s" % - random.randrange(999999999)) + setting = URIRef(subject + + "/map/%s" % random.randrange(999999999)) adds.update([ (subject, predicate, setting, context), (setting, RDF.type, nodeClass, context), (setting, keyPred, newKey, context), - ]) + ]) - with self.currentState(tripleFilter=(setting, valuePred, None)) as current: + with self.currentState(tripleFilter=(setting, valuePred, + None)) as current: dels = set([]) for prev in current.objects(setting, valuePred): dels.add((setting, valuePred, prev, context)) @@ -92,22 +101,30 @@ is the right amount of statements to remove a node that patchMapping made. """ - p = Patch(delQuads=[spo+(context,) for spo in - chain(self._graph.triples((None, None, node), - context=context), - self._graph.triples((node, None, None), - context=context))]) + p = Patch(delQuads=[ + spo + (context,) for spo in chain( + self._graph.triples((None, None, node), context=context), + self._graph.triples((node, None, None), context=context)) + ]) self.patch(p) + import unittest from rdflib import ConjunctiveGraph + + class TestPatchSubgraph(unittest.TestCase): + def testCollapsesIdenticalQuads(self): appliedPatches = [] + class Obj(GraphEditApi): + def patch(self, p): appliedPatches.append(p) + _graph: ConjunctiveGraph + obj = Obj() obj._graph = ConjunctiveGraph() stmt1 = (URIRef('s'), URIRef('p'), URIRef('o'), URIRef('g'))
--- a/rdfdb/graphfile.py Mon May 27 07:01:56 2019 +0000 +++ b/rdfdb/graphfile.py Mon May 27 07:45:24 2019 +0000 @@ -12,10 +12,12 @@ log = logging.getLogger('graphfile') iolog = logging.getLogger('io') + def patchN3SerializerToUseLessWhitespace(cutColumn=65): # todo: make a n3serializer subclass with whitespace settings from rdflib.plugins.serializers.turtle import TurtleSerializer, OBJECT originalWrite = TurtleSerializer.write + def write(self, s): lines = s.split('\n') if len(lines) > 1: @@ -23,7 +25,9 @@ else: self._column += len(lines[0]) return originalWrite(self, s) - TurtleSerializer.write = write # type: ignore + + TurtleSerializer.write = write # type: ignore + def predicateList(self, subject, newline=False): properties = self.buildPredicateHash(subject) propList = self.sortProperties(properties) @@ -38,6 +42,7 @@ self.write('\n' + self.indent(1)) self.verb(predicate, newline=False) self.objectList(properties[predicate]) + def objectList(self, objects): count = len(objects) if count == 0: @@ -51,34 +56,40 @@ self.depth -= depthmod originalStatement = TurtleSerializer.statement + def statement(self, subject): if list(self.store.triples((subject, RDF.type, None))): self.write('\n') originalStatement(self, subject) - return False # suppress blank line for 'minor' statements + return False # suppress blank line for 'minor' statements + TurtleSerializer.statement = statement # type: ignore TurtleSerializer.predicateList = predicateList # type: ignore TurtleSerializer.objectList = objectList # type: ignore + patchN3SerializerToUseLessWhitespace() class PatchCb(Protocol): - def __call__(self, patch: Patch, dueToFileChange: bool=False) -> None: ... + + def __call__(self, patch: Patch, dueToFileChange: bool = False) -> None: + ... + class GetSubgraph(Protocol): - def __call__(self, uri: URIRef) -> Graph: ... - + + def __call__(self, uri: URIRef) -> Graph: + ... + + class GraphFile(object): """ one rdf file that we read from, write to, and notice external changes to """ - def __init__(self, - notifier: INotify, - path: bytes, - uri: URIRef, - patch: PatchCb, - getSubgraph: GetSubgraph, + + def __init__(self, notifier: INotify, path: bytes, uri: URIRef, + patch: PatchCb, getSubgraph: GetSubgraph, globalPrefixes: Dict[str, URIRef], ctxPrefixes: Dict[str, URIRef]): """ @@ -93,12 +104,12 @@ self.path, self.uri = path, uri self.patch, self.getSubgraph = patch, getSubgraph - self.lastWriteTimestamp = 0.0 # mtime from the last time _we_ wrote + self.lastWriteTimestamp = 0.0 # mtime from the last time _we_ wrote self.globalPrefixes = globalPrefixes self.ctxPrefixes = ctxPrefixes self.readPrefixes: Dict[str, URIRef] = {} - + if not os.path.exists(path): # can't start notify until file exists try: @@ -113,17 +124,16 @@ # didn't work: self.lastWriteTimestamp = os.path.getmtime(path) - - self.flushDelay = 2 # seconds until we have to call flush() when dirty + self.flushDelay = 2 # seconds until we have to call flush() when dirty self.writeCall: Optional[IDelayedCall] = None self.notifier = notifier self.addWatch() - + def addWatch(self) -> None: # emacs save comes in as IN_MOVE_SELF, maybe - + # I was hoping not to watch IN_CHANGED and get lots of # half-written files, but emacs doesn't close its files after # a write, so there's no other event. I could try to sleep @@ -135,7 +145,7 @@ log.info("add watch on %s", self.path) self.notifier.watch(FilePath(self.path), callbacks=[self.notify]) - + def notify(self, notifier: INotify, filepath: FilePath, mask: int) -> None: try: maskNames = humanReadableMask(mask) @@ -145,9 +155,9 @@ self.fileGone() return else: - log.warn("%s delete_self event but file is here. " - "probably a new version moved in", - filepath) + log.warn( + "%s delete_self event but file is here. " + "probably a new version moved in", filepath) # we could filter these out in the watch() call, but I want # the debugging @@ -162,7 +172,7 @@ except OSError as e: log.error("%s: %r" % (filepath, e)) # getting OSError no such file, followed by no future reads - reactor.callLater(.5, self.addWatch) # ? + reactor.callLater(.5, self.addWatch) # ? return @@ -176,11 +186,12 @@ """ our file is gone; remove the statements from that context """ - myQuads = [(s,p,o,self.uri) for s,p,o in self.getSubgraph(self.uri)] + myQuads = [(s, p, o, self.uri) for s, p, o in self.getSubgraph(self.uri) + ] log.debug("dropping all statements from context %s", self.uri) if myQuads: self.patch(Patch(delQuads=myQuads), dueToFileChange=True) - + def reread(self) -> None: """update the graph with any diffs from this file @@ -191,7 +202,8 @@ try: contents = open(self.path).read() if contents.startswith("#new"): - log.debug("%s ignoring empty contents of my new file", self.path) + log.debug("%s ignoring empty contents of my new file", + self.path) # this is a new file we're starting, and we should not # patch our graph as if it had just been cleared. We # shouldn't even be here reading this, but @@ -244,7 +256,8 @@ self.writeCall.reset(self.flushDelay) else: # This awkward assignment is just to hide from mypy. - setattr(self, 'writeCall', reactor.callLater(self.flushDelay, self.flush)) + setattr(self, 'writeCall', + reactor.callLater(self.flushDelay, self.flush)) def flush(self) -> None: self.writeCall = None @@ -261,10 +274,8 @@ f.close() self.lastWriteTimestamp = os.path.getmtime(tmpOut) os.rename(tmpOut, self.path) - iolog.info("%s rewrote in %.1f ms", - self.path, serializeTime * 1000) - + iolog.info("%s rewrote in %.1f ms", self.path, serializeTime * 1000) + def __repr__(self) -> str: - return "%s(path=%r, uri=%r, ...)" % ( - self.__class__.__name__, self.path, self.uri) - + return "%s(path=%r, uri=%r, ...)" % (self.__class__.__name__, self.path, + self.uri)
--- a/rdfdb/graphfile_test.py Mon May 27 07:01:56 2019 +0000 +++ b/rdfdb/graphfile_test.py Mon May 27 07:45:24 2019 +0000 @@ -4,7 +4,9 @@ from rdflib import URIRef, Graph from rdfdb.graphfile import GraphFile + class TestGraphFileOutput(unittest.TestCase): + def testMaintainsN3PrefixesFromInput(self): tf = tempfile.NamedTemporaryFile(suffix='_test.n3') tf.write(b''' @@ -16,9 +18,11 @@ def getSubgraph(uri): return Graph() - gf = GraphFile(mock.Mock(), tf.name.encode('ascii'), URIRef('uri'), mock.Mock(), getSubgraph, {}, {}) + + gf = GraphFile(mock.Mock(), tf.name.encode('ascii'), URIRef('uri'), + mock.Mock(), getSubgraph, {}, {}) gf.reread() - + newGraph = Graph() newGraph.add((URIRef('http://example.com/boo'), URIRef('http://example.com/n/two'), @@ -26,7 +30,8 @@ gf.dirty(newGraph) gf.flush() wroteContent = open(tf.name, 'rb').read() - self.assertEqual(b'''@prefix : <http://example.com/> . + self.assertEqual( + b'''@prefix : <http://example.com/> . @prefix n: <http://example.com/n/> . @prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . @prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
--- a/rdfdb/localsyncedgraph.py Mon May 27 07:01:56 2019 +0000 +++ b/rdfdb/localsyncedgraph.py Mon May 27 07:45:24 2019 +0000 @@ -5,13 +5,14 @@ from rdfdb.grapheditapi import GraphEditApi from rdfdb.rdflibpatch import patchQuads + class LocalSyncedGraph(CurrentStateGraphApi, AutoDepGraphApi, GraphEditApi): """for tests""" + def __init__(self, files=None): self._graph = ConjunctiveGraph() for f in files or []: self._graph.parse(f, format='n3') - def patch(self, p): patchQuads(self._graph,
--- a/rdfdb/mock_syncedgraph.py Mon May 27 07:01:56 2019 +0000 +++ b/rdfdb/mock_syncedgraph.py Mon May 27 07:45:24 2019 +0000 @@ -1,25 +1,33 @@ - from rdflib import Graph, RDF, RDFS from rdflib.parser import StringInputSource + class MockSyncedGraph(object): """ Lets users of SyncedGraph mostly work. Doesn't yet help with any testing of the rerun-upon-graph-change behavior. """ + def __init__(self, n3Content): self._graph = Graph() self._graph.parse(StringInputSource(n3Content), format='n3') def addHandler(self, func): func() - - def value(self, subject=None, predicate=RDF.value, object=None, - default=None, any=True): + + def value(self, + subject=None, + predicate=RDF.value, + object=None, + default=None, + any=True): if object is not None: raise NotImplementedError() - return self._graph.value(subject, predicate, object=object, - default=default, any=any) + return self._graph.value(subject, + predicate, + object=object, + default=default, + any=any) def objects(self, subject=None, predicate=None): return self._graph.objects(subject, predicate) @@ -32,7 +40,7 @@ def predicate_objects(self, subject): return self._graph.predicate_objects(subject) - + def items(self, listUri): """generator. Having a chain of watchers on the results is not well-tested yet"""
--- a/rdfdb/patch.py Mon May 27 07:01:56 2019 +0000 +++ b/rdfdb/patch.py Mon May 27 07:45:24 2019 +0000 @@ -3,11 +3,11 @@ from typing import Optional XSD = Namespace("http://www.w3.org/2001/XMLSchema#") - from rdfdb.rdflibpatch import graphFromNQuad, graphFromQuads, serializeQuad ALLSTMTS = (None, None, None) + def quadsWithContextUris(quads): """ yield the given quads, correcting any context values that are @@ -15,12 +15,13 @@ """ if isinstance(quads, ConjunctiveGraph): quads = quads.quads(ALLSTMTS) - for s,p,o,c in quads: + for s, p, o, c in quads: if isinstance(c, Graph): c = c.identifier if not isinstance(c, URIRef): - raise TypeError("bad quad context type in %r" % ((s,p,o,c),)) - yield s,p,o,c + raise TypeError("bad quad context type in %r" % ((s, p, o, c),)) + yield s, p, o, c + class Patch(object): """ @@ -28,9 +29,13 @@ the json representation includes the {"patch":...} wrapper """ - def __init__(self, jsonRepr: Optional[str]=None, - addQuads=None, delQuads=None, - addGraph=None, delGraph=None): + + def __init__(self, + jsonRepr: Optional[str] = None, + addQuads=None, + delQuads=None, + addGraph=None, + delGraph=None): """ addQuads/delQuads can be lists or sets, but if we make them internally, they'll be lists @@ -49,20 +54,23 @@ self.senderUpdateUri = body['senderUpdateUri'] def __str__(self): + def shorten(n): if isinstance(n, Literal): if n.datatype == XSD['double']: return str(n.toPython()) return n.n3() + def formatQuad(quad): return " ".join(shorten(n) for n in quad) + delLines = [" -%s" % formatQuad(q) for q in self.delQuads] addLines = [" +%s" % formatQuad(q) for q in self.addQuads] return "\nPatch:\n" + "\n".join(delLines) + "\n" + "\n".join(addLines) def shortSummary(self): return "[-%s +%s]" % (len(self.delQuads), len(self.addQuads)) - + @classmethod def fromDiff(cls, oldGraph, newGraph): """ @@ -78,16 +86,16 @@ """ if self._jsonRepr and self._jsonRepr.strip(): raise NotImplementedError() - return bool(self._addQuads or self._delQuads or - self._addGraph or self._delGraph) + return bool(self._addQuads or self._delQuads or self._addGraph or + self._delGraph) @property def addQuads(self): if self._addQuads is None: if self._addGraph is None: return [] - self._addQuads = list(quadsWithContextUris( - self._addGraph.quads(ALLSTMTS))) + self._addQuads = list( + quadsWithContextUris(self._addGraph.quads(ALLSTMTS))) return self._addQuads @property @@ -95,8 +103,8 @@ if self._delQuads is None: if self._delGraph is None: return [] - self._delQuads = list(quadsWithContextUris( - self._delGraph.quads(ALLSTMTS))) + self._delQuads = list( + quadsWithContextUris(self._delGraph.quads(ALLSTMTS))) return self._delQuads @property @@ -118,10 +126,12 @@ return self._jsonRepr def makeJsonRepr(self, extraAttrs={}) -> str: - d = {"patch" : { - 'adds' : serializeQuad(self.addGraph), - 'deletes' : serializeQuad(self.delGraph), - }} + d = { + "patch": { + 'adds': serializeQuad(self.addGraph), + 'deletes': serializeQuad(self.delGraph), + } + } if len(self.addGraph) > 0 and d['patch']['adds'].strip() == "": # this is the bug that graphFromNQuad works around raise ValueError("nquads serialization failure") @@ -137,7 +147,7 @@ if not both: return self return Patch(addQuads=adds - both, delQuads=dels - both) - + def concat(self, more): """ new Patch with the result of applying this patch and the @@ -167,7 +177,9 @@ ctx = q[3] if ctx != q[3]: - raise ValueError("patch applies to multiple contexts, at least %r and %r" % (ctx, q[3])) + raise ValueError( + "patch applies to multiple contexts, at least %r and %r" % + (ctx, q[3])) if ctx is None: raise ValueError("patch affects no contexts") assert isinstance(ctx, URIRef), ctx @@ -175,11 +187,13 @@ def isNoop(self): return set(self.addQuads) == set(self.delQuads) - + stmt1 = U('http://a'), U('http://b'), U('http://c'), U('http://ctx1') + class TestPatchFromDiff(unittest.TestCase): + def testEmpty(self): g = ConjunctiveGraph() p = Patch.fromDiff(g, g) @@ -210,8 +224,10 @@ self.assertEqual(p.delQuads, [stmt1]) p = Patch.fromDiff(ConjunctiveGraph(), [stmt1]) self.assertEqual(p.addQuads, [stmt1]) - + + class TestPatchGetContext(unittest.TestCase): + def testEmptyPatchCantGiveContext(self): p = Patch() self.assertRaises(ValueError, p.getContext) @@ -221,7 +237,7 @@ self.assertEqual(p.getContext(), U('http://ctx1')) def testMultiContextPatchFailsToReturnContext(self): - p = Patch(addQuads=[stmt1[:3] + (U('http://ctx1'),), - stmt1[:3] + (U('http://ctx2'),)]) + p = Patch(addQuads=[ + stmt1[:3] + (U('http://ctx1'),), stmt1[:3] + (U('http://ctx2'),) + ]) self.assertRaises(ValueError, p.getContext) -
--- a/rdfdb/patchreceiver.py Mon May 27 07:01:56 2019 +0000 +++ b/rdfdb/patchreceiver.py Mon May 27 07:45:24 2019 +0000 @@ -4,12 +4,14 @@ from rdfdb.patch import Patch log = logging.getLogger('syncedgraph') + class PatchReceiver(object): """ runs a web server in this process and registers it with the rdfdb master. See onPatch for what happens when the rdfdb master sends us a patch """ + def __init__(self, rdfdbRoot, host, label, onPatch): """ label is what we'll call ourselves to the rdfdb server @@ -17,11 +19,13 @@ onPatch is what we call back when the server sends a patch """ self.rdfdbRoot = rdfdbRoot - listen = reactor.listenTCP(0, cyclone.web.Application(handlers=[ - (r'/update', makePatchEndpoint(onPatch)), - ])) + listen = reactor.listenTCP( + 0, + cyclone.web.Application(handlers=[ + (r'/update', makePatchEndpoint(onPatch)), + ])) port = listen._realPortNumber # what's the right call for this? - + self.updateResource = 'http://%s:%s/update' % (host, port) log.info("listening on %s" % port) self._register(label) @@ -33,32 +37,40 @@ cyclone.httpclient.fetch( url=url, method=b'POST', - headers={b'Content-Type': [b'application/x-www-form-urlencoded']}, + headers={ + b'Content-Type': [b'application/x-www-form-urlencoded'] + }, postdata=body, - ).addCallbacks(self._done, - lambda err: self._registerError(err, url, body)) + ).addCallbacks( + self._done, lambda err: self._registerError(err, url, body)) log.info("registering with rdfdb at %s", url) def _registerError(self, err, url, body): log.error('registering to url=%r body=%r', url, body) log.error(err) - + def _done(self, x): log.debug("registered with rdfdb") - - + + def makePatchEndpointPutMethod(cb): + def put(self): try: p = Patch(jsonRepr=self.request.body) - log.debug("received patch -%d +%d" % (len(p.delGraph), len(p.addGraph))) + log.debug("received patch -%d +%d" % + (len(p.delGraph), len(p.addGraph))) cb(p) except: traceback.print_exc() raise + return put + def makePatchEndpoint(cb): + class Update(cyclone.web.RequestHandler): put = makePatchEndpointPutMethod(cb) + return Update
--- a/rdfdb/patchsender.py Mon May 27 07:01:56 2019 +0000 +++ b/rdfdb/patchsender.py Mon May 27 07:45:24 2019 +0000 @@ -7,7 +7,8 @@ log = logging.getLogger('syncedgraph') -SendResult = defer.Deferred# to None +SendResult = defer.Deferred # to None + class PatchSender(object): """ @@ -15,6 +16,7 @@ them. This object buffers and may even collapse patches before they go the server """ + def __init__(self, target: URIRef, myUpdateResource): """ target is the URI we'll send patches to @@ -33,7 +35,7 @@ self._patchesToSend.append((p, sendResult)) self._continueSending() return sendResult - + def cancelAll(self): self._patchesToSend[:] = [] # we might be in the middle of a post; ideally that would be @@ -55,9 +57,11 @@ p = self._patchesToSend[0].concat(self._patchesToSend[1:]) print("concat down to") print('dels') - for q in p.delQuads: print(q) + for q in p.delQuads: + print(q) print('adds') - for q in p.addQuads: print(q) + for q in p.addQuads: + print(q) print("----") else: p, sendResult = self._patchesToSend.pop(0) @@ -96,6 +100,7 @@ log.error(e) self._continueSending() + def sendPatch(putUri: URIRef, patch: Patch, **kw) -> defer.Deferred: """ PUT a patch as json to an http server. Returns deferred. @@ -108,8 +113,10 @@ intro = body[:200] if len(body) > 200: intro = intro + "..." - log.debug("send body (rendered %.1fkB in %.1fms): %s", len(body) / 1024, jsonTime * 1000, intro) + log.debug("send body (rendered %.1fkB in %.1fms): %s", + len(body) / 1024, jsonTime * 1000, intro) sendTime = time.time() + def putDone(done): if not str(done.code).startswith('2'): raise ValueError("sendPatch request failed %s: %s" % @@ -121,6 +128,8 @@ return cyclone.httpclient.fetch( url=putUri.toPython().encode('ascii'), method=b'PUT', - headers={b'Content-Type': [b'application/json']}, + headers={ + b'Content-Type': [b'application/json'] + }, postdata=body.encode('utf8'), - ).addCallback(putDone) + ).addCallback(putDone)
--- a/rdfdb/rdflibpatch.py Mon May 27 07:01:56 2019 +0000 +++ b/rdfdb/rdflibpatch.py Mon May 27 07:45:24 2019 +0000 @@ -9,6 +9,7 @@ import unittest from rdflib import ConjunctiveGraph, Graph, URIRef as U, Literal + def patchQuads(graph, deleteQuads, addQuads, perfect=False): """ Delete the sequence of given quads. Then add the given quads just @@ -43,11 +44,13 @@ raise ValueError("%r already in %r" % (spoc[:3], spoc[3])) graph.addN(addQuads) + def fixContextToUri(spoc): if not isinstance(spoc[3], U): return spoc[:3] + (spoc[3].identifier,) return spoc - + + def inGraph(spoc, graph): """ c is just a URIRef. @@ -57,7 +60,7 @@ c = spoc[3] if isinstance(c, Graph): c = c.identifier - + for spoc2 in graph.quads(spoc[:3]): if spoc[:3] == spoc2[:3]: c2 = spoc2[3] @@ -67,22 +70,27 @@ return True return False + # some of the following workarounds may be fixed in https://github.com/RDFLib/rdflib/issues/299 def graphFromQuads(q): g = ConjunctiveGraph() #g.addN(q) # no effect on nquad output - for s,p,o,c in q: + for s, p, o, c in q: #g.get_context(c).add((s,p,o)) # kind of works with broken rdflib nquad serializer code; you need this for json_ld serialize to work :( - g.store.add((s,p,o), c) # no effect on nquad output + g.store.add((s, p, o), c) # no effect on nquad output return g + def graphFromNQuad(text): g1 = ConjunctiveGraph() # text might omit ctx on some lines. rdflib just puts in a bnode, which shows up later. g1.parse(data=text, format='nquads') return g1 + from rdflib.plugins.serializers.nt import _quoteLiteral + + def serializeQuad(g): """ replacement for graph.serialize(format='nquads') @@ -92,72 +100,84 @@ TestGraphFromQuads.testSerializes. """ out = [] - for s,p,o,c in g.quads((None,None,None)): + for s, p, o, c in g.quads((None, None, None)): if isinstance(c, Graph): # still not sure why this is Graph sometimes, # already URIRef other times c = c.identifier if '[' in c.n3(): - import ipdb;ipdb.set_trace() + import ipdb + ipdb.set_trace() ntObject = _quoteLiteral(o) if isinstance(o, Literal) else o.n3() - out.append("%s %s %s %s .\n" % (s.n3(), - p.n3(), - ntObject, - c.n3())) + out.append("%s %s %s %s .\n" % (s.n3(), p.n3(), ntObject, c.n3())) return ''.join(out) + def inContext(graph, newContext): """ make a ConjunctiveGraph where all the triples in the given graph (or collection) are now in newContext (a uri) """ - return graphFromQuads((s,p,o,newContext) for s,p,o in graph) + return graphFromQuads((s, p, o, newContext) for s, p, o in graph) + def contextsForStatement(graph, triple): return [q[3] for q in graph.quads(triple)] -A = U("http://a"); B = U("http://b") +A = U("http://a") +B = U("http://b") + + class TestInContext(unittest.TestCase): + def testResultHasQuads(self): - g = inContext([(A,A,A)], B) - self.assertEqual(list(g.quads())[0], (A,A,A,B)) - + g = inContext([(A, A, A)], B) + self.assertEqual(list(g.quads())[0], (A, A, A, B)) + + class TestContextsForStatement(unittest.TestCase): + def testNotFound(self): - g = graphFromQuads([(A,A,A,A)]) - self.assertEqual(contextsForStatement(g, (B,B,B)), []) + g = graphFromQuads([(A, A, A, A)]) + self.assertEqual(contextsForStatement(g, (B, B, B)), []) + def testOneContext(self): - g = graphFromQuads([(A,A,A,A), (A,A,B,B)]) - self.assertEqual(contextsForStatement(g, (A,A,A)), [A]) + g = graphFromQuads([(A, A, A, A), (A, A, B, B)]) + self.assertEqual(contextsForStatement(g, (A, A, A)), [A]) + def testTwoContexts(self): - g = graphFromQuads([(A,A,A,A), (A,A,A,B)]) - self.assertEqual(sorted(contextsForStatement(g, (A,A,A))), sorted([A,B])) + g = graphFromQuads([(A, A, A, A), (A, A, A, B)]) + self.assertEqual(sorted(contextsForStatement(g, (A, A, A))), + sorted([A, B])) + # There's a case where contextsForStatement was returning a Graph # with identifier, which I've fixed without a test class TestInGraph(unittest.TestCase): + def testSimpleMatch(self): - g = graphFromQuads([(A,A,A,A)]) - self.assertTrue(inGraph((A,A,A,A), g)) + g = graphFromQuads([(A, A, A, A)]) + self.assertTrue(inGraph((A, A, A, A), g)) def testDontMatchDifferentStatement(self): - g = graphFromQuads([(A,A,A,A)]) - self.assertFalse(inGraph((B,B,B,B), g)) - + g = graphFromQuads([(A, A, A, A)]) + self.assertFalse(inGraph((B, B, B, B), g)) + def testDontMatchStatementInAnotherContext(self): - g = graphFromQuads([(A,A,A,A)]) - self.assertFalse(inGraph((A,A,A,B), g)) - - self.assertFalse(inGraph((A,A,A,Graph(identifier=B)), g)) - + g = graphFromQuads([(A, A, A, A)]) + self.assertFalse(inGraph((A, A, A, B), g)) + + self.assertFalse(inGraph((A, A, A, Graph(identifier=B)), g)) + class TestGraphFromQuads(unittest.TestCase): nqOut = '<http://example.com/> <http://example.com/> <http://example.com/> <http://example.com/> .\n' + def testSerializes(self): n = U("http://example.com/") - g = graphFromQuads([(n,n,n,n)]) + g = graphFromQuads([(n, n, n, n)]) out = serializeQuad(g) self.assertEqual(out.strip(), self.nqOut.strip()) @@ -168,29 +188,35 @@ self.assertEqual(out.strip(), self.nqOut.strip()) -A = U("http://a"); B = U("http://b"); C = U("http://c") -CTX1 = U('http://ctx1'); CTX2 = U('http://ctx2') +A = U("http://a") +B = U("http://b") +C = U("http://c") +CTX1 = U('http://ctx1') +CTX2 = U('http://ctx2') stmt1 = A, B, C, CTX1 stmt2 = A, B, C, CTX2 + + class TestPatchQuads(unittest.TestCase): + def testAddsToNewContext(self): g = ConjunctiveGraph() patchQuads(g, [], [stmt1]) self.assertEqual(len(g), 1) - quads = list(g.quads((None,None,None))) + quads = list(g.quads((None, None, None))) self.assertEqual(quads, [(A, B, C, Graph(identifier=CTX1))]) def testDeletes(self): g = ConjunctiveGraph() patchQuads(g, [], [stmt1]) patchQuads(g, [stmt1], []) - quads = list(g.quads((None,None,None))) + quads = list(g.quads((None, None, None))) self.assertEqual(quads, []) def testDeleteRunsBeforeAdd(self): g = ConjunctiveGraph() patchQuads(g, [stmt1], [stmt1]) - quads = list(g.quads((None,None,None))) + quads = list(g.quads((None, None, None))) self.assertEqual(quads, [(A, B, C, Graph(identifier=CTX1))]) def testPerfectAddRejectsExistingStmt(self): @@ -202,7 +228,7 @@ g = ConjunctiveGraph() patchQuads(g, [], [stmt1]) patchQuads(g, [], [stmt2], perfect=True) - self.assertEqual(len(list(g.quads((None,None,None)))), 2) + self.assertEqual(len(list(g.quads((None, None, None)))), 2) def testPerfectDeleteRejectsAbsentStmt(self): g = ConjunctiveGraph() @@ -212,7 +238,7 @@ g = ConjunctiveGraph() patchQuads(g, [], [stmt2]) self.assertRaises(ValueError, patchQuads, g, [stmt1], [], perfect=True) - + def testPerfectDeleteAllowsRemovalOfStmtInMultipleContexts(self): g = ConjunctiveGraph() patchQuads(g, [], [stmt1, stmt2]) @@ -222,4 +248,3 @@ g = ConjunctiveGraph() patchQuads(g, [], [stmt1, stmt1], perfect=True) patchQuads(g, [stmt1, stmt1], [], perfect=True) -
--- a/rdfdb/rdflibpatch_literal.py Mon May 27 07:01:56 2019 +0000 +++ b/rdfdb/rdflibpatch_literal.py Mon May 27 07:45:24 2019 +0000 @@ -6,22 +6,23 @@ from rdflib.term import _PLAIN_LITERAL_TYPES, _XSD_DOUBLE, _XSD_DECIMAL, Literal from re import sub + def _literal_n3(self, use_plain=False, qname_callback=None): if use_plain and self.datatype in _PLAIN_LITERAL_TYPES: try: - self.toPython() # check validity - # this is a bit of a mess - + self.toPython() # check validity + # this is a bit of a mess - # in py >=2.6 the string.format function makes this easier # we try to produce "pretty" output if self.datatype == _XSD_DOUBLE: # this is the drewp fix - return sub(r"\.?0*e","e", '%e' % float(self)) + return sub(r"\.?0*e", "e", '%e' % float(self)) elif self.datatype == _XSD_DECIMAL: - return sub("0*$","0",'%f' % float(self)) + return sub("0*$", "0", '%f' % float(self)) else: return '%s' % self except ValueError: - pass # if it's in, we let it out? + pass # if it's in, we let it out? encoded = self._quote_encode() @@ -44,11 +45,16 @@ else: return '%s' % encoded + def patch(): Literal._literal_n3 = _literal_n3 + import unittest + + class TestDoubleOutput(unittest.TestCase): + def testNoDanglingPoint(self): vv = Literal("0.88", datatype=_XSD_DOUBLE) out = _literal_n3(vv, use_plain=True)
--- a/rdfdb/service.py Mon May 27 07:01:56 2019 +0000 +++ b/rdfdb/service.py Mon May 27 07:45:24 2019 +0000 @@ -20,14 +20,16 @@ log = logging.getLogger('rdfdb') log.setLevel(logging.DEBUG) + class WebsocketDisconnect(ValueError): pass - + class Client(object): """ one of our syncedgraph clients """ + def __init__(self, updateUri: URIRef, label: str): self.label = label # todo: updateUri is used publicly to compare clients. Replace @@ -44,8 +46,10 @@ broken. """ return sendPatch(self.updateUri, p) - + + class WsClient(object): + def __init__(self, connectionId: str, sendMessage): self.updateUri = URIRef(connectionId) self.sendMessage = sendMessage @@ -57,29 +61,31 @@ self.sendMessage(p.makeJsonRepr()) return defer.succeed(None) + def sendGraphToClient(graph, client: Union[Client, WsClient]) -> None: """send the client the whole graph contents""" log.info("sending all graphs to %r" % client) - client.sendPatch(Patch( - addQuads=graph.quads(ALLSTMTS), - delQuads=[])) - + client.sendPatch(Patch(addQuads=graph.quads(ALLSTMTS), delQuads=[])) + + class WatchedFiles(object): """ find files, notice new files. This object watches directories. Each GraphFile watches its own file. """ - def __init__(self, dirUriMap: DirUriMap, patch: PatchCb, getSubgraph: GetSubgraph, addlPrefixes: Dict[str, URIRef]): - self.dirUriMap = dirUriMap # {abspath : uri prefix} + + def __init__(self, dirUriMap: DirUriMap, patch: PatchCb, + getSubgraph: GetSubgraph, addlPrefixes: Dict[str, URIRef]): + self.dirUriMap = dirUriMap # {abspath : uri prefix} self.patch, self.getSubgraph = patch, getSubgraph self.addlPrefixes = addlPrefixes - - self.graphFiles: Dict[URIRef, GraphFile] = {} # context uri : GraphFile - + + self.graphFiles: Dict[URIRef, GraphFile] = {} # context uri : GraphFile + self.notifier = INotify() self.notifier.startReading() - + self.findAndLoadFiles() def findAndLoadFiles(self) -> None: @@ -92,7 +98,8 @@ # why wasn't mypy catching this? assert isinstance(p, bytes) self.watchFile(p) - self.notifier.watch(FilePath(dirpath), autoAdd=True, + self.notifier.watch(FilePath(dirpath), + autoAdd=True, callbacks=[self.dirChange]) finally: self.initialLoad = False @@ -101,10 +108,10 @@ if mask & IN_CREATE: if path.path.endswith((b'~', b'.swp', b'swx', b'.rdfdb-temp')): return - + log.debug("%s created; consider adding a watch", path) self.watchFile(path.path) - + def watchFile(self, inFile: bytes): """ consider adding a GraphFile to self.graphFiles @@ -123,7 +130,7 @@ if b'/capture/' in inFile: # smaller graph for now return - + # an n3 file with rules makes it all the way past this reading # and the serialization. Then, on the receiving side, a # SyncedGraph calls graphFromNQuad on the incoming data and @@ -137,7 +144,7 @@ # it. if inFile.endswith(b"config.n3"): return - + ctx = uriFromFile(self.dirUriMap, inFile) gf = self._addGraphFile(ctx, inFile) log.info("%s do initial read", inFile) @@ -166,14 +173,16 @@ def _addGraphFile(self, ctx, path): self.addlPrefixes.setdefault(ctx, {}) self.addlPrefixes.setdefault(None, {}) - gf = GraphFile(self.notifier, path, ctx, - self.patch, self.getSubgraph, + gf = GraphFile(self.notifier, + path, + ctx, + self.patch, + self.getSubgraph, globalPrefixes=self.addlPrefixes[None], ctxPrefixes=self.addlPrefixes[ctx]) - self.graphFiles[ctx] = gf + self.graphFiles[ctx] = gf return gf - def dirtyFiles(self, ctxs): """mark dirty the files that we watch in these contexts. @@ -187,22 +196,22 @@ g = self.getSubgraph(ctx) self.graphFiles[ctx].dirty(g) - + class Db(object): """ the master graph, all the connected clients, all the files we're watching """ + def __init__(self, dirUriMap: DirUriMap, addlPrefixes): self.clients: List[Union[Client, WsClient]] = [] self.graph = ConjunctiveGraph() - self.watchedFiles = WatchedFiles(dirUriMap, - self.patch, self.getSubgraph, - addlPrefixes) - + self.watchedFiles = WatchedFiles(dirUriMap, self.patch, + self.getSubgraph, addlPrefixes) + self.summarizeToLog() - def patch(self, patch: Patch, dueToFileChange: bool=False) -> None: + def patch(self, patch: Patch, dueToFileChange: bool = False) -> None: """ apply this patch to the master graph then notify everyone about it @@ -213,12 +222,12 @@ back to the sender with that updateUri """ ctx = patch.getContext() - log.info("patching graph %s -%d +%d" % ( - ctx, len(patch.delQuads), len(patch.addQuads))) + log.info("patching graph %s -%d +%d" % + (ctx, len(patch.delQuads), len(patch.addQuads))) - if hasattr(self, 'watchedFiles'): # not available during startup + if hasattr(self, 'watchedFiles'): # not available during startup self.watchedFiles.aboutToPatch(ctx) - + patchQuads(self.graph, patch.delQuads, patch.addQuads, perfect=True) self._sendPatch(patch) if not dueToFileChange: @@ -234,7 +243,7 @@ continue d = c.sendPatch(p) d.addErrback(self.clientErrored, c) - + def clientErrored(self, err, c): err.trap(twisted.internet.error.ConnectError, WebsocketDisconnect) log.info("%r %r - dropping client", c, err.getErrorMessage()) @@ -275,11 +284,16 @@ self.sendClientsToAllLivePages() def sendClientsToAllLivePages(self) -> None: - sendToLiveClients({"clients": [ - dict(updateUri=c.updateUri.toPython(), label=repr(c)) - for c in self.clients]}) + sendToLiveClients({ + "clients": [ + dict(updateUri=c.updateUri.toPython(), label=repr(c)) + for c in self.clients + ] + }) + class GraphResource(cyclone.web.RequestHandler): + def get(self): accept = self.request.headers.get('accept', '') format = 'n3' @@ -297,7 +311,9 @@ return self.write(self.settings.db.graph.serialize(format=format)) + class Patches(cyclone.web.RequestHandler): + def __init__(self, *args, **kw): cyclone.web.RequestHandler.__init__(self, *args, **kw) p = makePatchEndpointPutMethod(self.settings.db.patch) @@ -306,7 +322,9 @@ def get(self): pass + class GraphClients(cyclone.web.RequestHandler): + def get(self): pass @@ -318,17 +336,24 @@ import traceback traceback.print_exc() raise - + + class Prefixes(cyclone.web.RequestHandler): + def post(self): suggestion = json.loads(self.request.body) addlPrefixes = self.settings.db.watchedFiles.addlPrefixes - addlPrefixes.setdefault(URIRef(suggestion['ctx']), {}).update(suggestion['prefixes']) - + addlPrefixes.setdefault(URIRef(suggestion['ctx']), + {}).update(suggestion['prefixes']) + + _wsClientSerial = 0 + + class WebsocketClient(cyclone.websocket.WebSocketHandler): wsClient: Optional[WsClient] = None + def connectionMade(self, *args, **kwargs) -> None: global _wsClientSerial connectionId = f'connection-{_wsClientSerial}' @@ -340,8 +365,8 @@ def connectionLost(self, reason): log.info("bye ws client %r", self.wsClient) - self.settings.db.clientErrored( - Failure(WebsocketDisconnect(reason)), self.wsClient) + self.settings.db.clientErrored(Failure(WebsocketDisconnect(reason)), + self.wsClient) def messageReceived(self, message: bytes): if message == b'PING': @@ -353,6 +378,7 @@ p.senderUpdateUri = self.wsClient.updateUri self.settings.db.patch(p) + class Live(cyclone.websocket.WebSocketHandler): def connectionMade(self, *args, **kwargs): @@ -368,12 +394,16 @@ log.info("got message %s" % message) self.sendMessage(message) + liveClients: Set[Live] = set() + + def sendToLiveClients(d=None, asJson=None): j = asJson or json.dumps(d) for c in liveClients: c.sendMessage(j) + class NoExts(cyclone.web.StaticFileHandler): # .html pages can be get() without .html on them def get(self, path, *args, **kw): @@ -382,8 +412,8 @@ cyclone.web.StaticFileHandler.get(self, path, *args, **kw) -def main(dirUriMap: Optional[DirUriMap]=None, - prefixes: Optional[Dict[str, URIRef]]=None, +def main(dirUriMap: Optional[DirUriMap] = None, + prefixes: Optional[Dict[str, URIRef]] = None, port=9999): if dirUriMap is None: @@ -394,35 +424,39 @@ 'rdfs': URIRef('http://www.w3.org/2000/01/rdf-schema#'), 'xsd': URIRef('http://www.w3.org/2001/XMLSchema#'), } - + logging.basicConfig() log = logging.getLogger() parser = optparse.OptionParser() - parser.add_option("-v", "--verbose", action="store_true", + parser.add_option("-v", + "--verbose", + action="store_true", help="logging.DEBUG") (options, args) = parser.parse_args() log.setLevel(logging.DEBUG if options.verbose else logging.INFO) - db = Db(dirUriMap=dirUriMap, - addlPrefixes={None: prefixes}) + db = Db(dirUriMap=dirUriMap, addlPrefixes={None: prefixes}) from twisted.python import log as twlog twlog.startLogging(sys.stdout) - reactor.listenTCP(port, cyclone.web.Application(handlers=[ - (r'/live', Live), - (r'/graph', GraphResource), - (r'/patches', Patches), - (r'/graphClients', GraphClients), - (r'/syncedGraph', WebsocketClient), - (r'/prefixes', Prefixes), - - (r'/(.*)', NoExts, - {"path" : FilePath(__file__).sibling("web").path, - "default_filename" : "index.html"}), - - ], debug=True, db=db)) + reactor.listenTCP( + port, + cyclone.web.Application(handlers=[ + (r'/live', Live), + (r'/graph', GraphResource), + (r'/patches', Patches), + (r'/graphClients', GraphClients), + (r'/syncedGraph', WebsocketClient), + (r'/prefixes', Prefixes), + (r'/(.*)', NoExts, { + "path": FilePath(__file__).sibling("web").path, + "default_filename": "index.html" + }), + ], + debug=True, + db=db)) log.info("serving on %s" % port) reactor.run()
--- a/rdfdb/syncedgraph.py Mon May 27 07:01:56 2019 +0000 +++ b/rdfdb/syncedgraph.py Mon May 27 07:45:24 2019 +0000 @@ -55,7 +55,11 @@ pending local changes) and get the data again from the server. """ - def __init__(self, rdfdbRoot: URIRef, label: str, receiverHost: Optional[str]=None): + + def __init__(self, + rdfdbRoot: URIRef, + label: str, + receiverHost: Optional[str] = None): """ label is a string that the server will display in association with your connection @@ -64,19 +68,20 @@ """ if receiverHost is None: receiverHost = socket.gethostname() - + self.rdfdbRoot = rdfdbRoot self.initiallySynced: defer.Deferred[None] = defer.Deferred() self._graph = ConjunctiveGraph() - self._receiver = PatchReceiver(self.rdfdbRoot, receiverHost, label, self._onPatch) - + self._receiver = PatchReceiver(self.rdfdbRoot, receiverHost, label, + self._onPatch) + self._sender = PatchSender(self.rdfdbRoot + 'patches', self._receiver.updateResource) AutoDepGraphApi.__init__(self) # this needs more state to track if we're doing a resync (and # everything has to error or wait) or if we're live - + def resync(self): """ get the whole graph again from the server (e.g. we had a @@ -97,12 +102,14 @@ return cyclone.httpclient.fetch( url=self.rdfdbRoot + "graph", method=b"GET", - headers={b'Accept':[b'x-trig']}, - ).addCallback(self._resyncGraph) + headers={ + b'Accept': [b'x-trig'] + }, + ).addCallback(self._resyncGraph) def _resyncGraph(self, response): log.warn("new graph in") - + #diff against old entire graph #broadcast that change @@ -113,7 +120,7 @@ if p.isNoop(): log.info("skipping no-op patch") return - + # these could fail if we're out of sync. One approach: # Rerequest the full state from the server, try the patch # again after that, then give up. @@ -141,7 +148,10 @@ particular file flush """ treq.post(self.rdfdbRoot + 'prefixes', - json.dumps({'ctx': ctx, 'prefixes': prefixes}).encode('utf8')) + json.dumps({ + 'ctx': ctx, + 'prefixes': prefixes + }).encode('utf8')) def sendFailed(self, result): """ @@ -150,7 +160,7 @@ """ log.warn("sendFailed") self.resync() - + #i think we should receive back all the pending patches, #do a resync here, #then requeue all the pending patches (minus the failing one?) after that's done.