Mercurial > code > home > repos > rdfdb
changeset 0:d487d597ad33
initial split from light9/
Ignore-this: a4e6db574ab652fa53d9f024c2152143
author | Drew Perttula <drewp@bigasterisk.com> |
---|---|
date | Sun, 18 Feb 2018 08:06:43 +0000 |
parents | |
children | 674833ada390 |
files | rdfdb/__init__.py rdfdb/autodepgraphapi.py rdfdb/clientsession.py rdfdb/currentstategraphapi.py rdfdb/file_vs_uri.py rdfdb/grapheditapi.py rdfdb/graphfile.py rdfdb/graphfile_test.py rdfdb/localsyncedgraph.py rdfdb/mock_syncedgraph.py rdfdb/patch.py rdfdb/patchreceiver.py rdfdb/patchsender.py rdfdb/rdflibpatch.py rdfdb/rdflibpatch_literal.py rdfdb/syncedgraph.py rdfdb/web/graphView.html rdfdb/web/index.html rdfdb/web/style.css rdfdb/web/syncedgraph.js |
diffstat | 19 files changed, 1851 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/autodepgraphapi.py Sun Feb 18 08:06:43 2018 +0000 @@ -0,0 +1,221 @@ +import logging +from rdflib import RDF, RDFS +from light9.rdfdb.currentstategraphapi import contextsForStatementNoWildcards +log = logging.getLogger('autodepgraphapi') + +class AutoDepGraphApi(object): + """ + knockoutjs-inspired API for automatically building a dependency + tree while reading the graph. See addHandler(). + + This design is more aggressive than knockoutjs, since the obvious + query methods like value() all error if you're not in a watched + section of code. This is supposed to make it easier to notice + dependency mistakes, especially when porting old code to use + SyncedGraph. + + This class is a mixin for SyncedGraph, separated here because + these methods work together + """ + + def __init__(self): + self._watchers = _GraphWatchers() + self.currentFuncs = [] # stack of addHandler callers + + def addHandler(self, func): + """ + run this (idempotent) func, noting what graph values it + uses. Run it again in the future if there are changes to those + graph values. The func might use different values during that + future call, and those will be what we watch for next. + """ + + # if we saw this func before, we need to forget the old + # callbacks it wanted and replace with the new ones we see + # now. + + # if one handler func calls another, does that break anything? + # maybe not? + + # no plan for sparql queries yet. Hook into a lower layer that + # reveals all their statement fetches? Just make them always + # new? Cache their results, so if i make the query again and + # it gives the same result, I don't call the handler? + + self.currentFuncs.append(func) + log.debug('graph.currentFuncs push %s', func) + try: + try: + func() + except: + import traceback + traceback.print_exc() + raise + finally: + self.currentFuncs.pop() + log.debug('graph.currentFuncs pop %s. stack now has %s', func, len(self.currentFuncs)) + + def runDepsOnNewPatch(self, p): + """ + patch p just happened to the graph; call everyone back who + might care, and then notice what data they depend on now + """ + for func in self._watchers.whoCares(p): + # todo: forget the old handlers for this func + log.debug('runDepsOnNewPatch calling watcher %s', p) + self.addHandler(func) + + def _getCurrentFunc(self): + if not self.currentFuncs: + # this may become a warning later + raise ValueError("asked for graph data outside of a handler") + + # we add the watcher to the deepest function, since that + # should be the cheapest way to update when this part of the + # data changes + return self.currentFuncs[-1] + + # these just call through to triples() so it might be possible to + # watch just that one. + + # if you get a bnode in your response, maybe the answer to + # dependency tracking is to say that you depended on the triple + # that got you that bnode, since it is likely to change to another + # bnode later. This won't work if the receiver stores bnodes + # between calls, but probably most of them don't do that (they + # work from a starting uri) + + def value(self, subject=None, predicate=RDF.value, object=None, + default=None, any=True): + if object is not None: + raise NotImplementedError() + func = self._getCurrentFunc() + self._watchers.addSubjPredWatcher(func, subject, predicate) + return self._graph.value(subject, predicate, object=object, + default=default, any=any) + + def objects(self, subject=None, predicate=None): + func = self._getCurrentFunc() + self._watchers.addSubjPredWatcher(func, subject, predicate) + return self._graph.objects(subject, predicate) + + def label(self, uri): + return self.value(uri, RDFS.label) + + def subjects(self, predicate=None, object=None): + func = self._getCurrentFunc() + self._watchers.addPredObjWatcher(func, predicate, object) + return self._graph.subjects(predicate, object) + + def predicate_objects(self, subject): + func = self._getCurrentFunc() + self._watchers.addSubjectWatcher(func, subject) + return self._graph.predicate_objects(subject) + + def items(self, listUri): + """generator. Having a chain of watchers on the results is not + well-tested yet""" + chain = set([listUri]) + while listUri: + item = self.value(listUri, RDF.first) + if item: + yield item + listUri = self.value(listUri, RDF.rest) + if listUri in chain: + raise ValueError("List contains a recursive rdf:rest reference") + chain.add(listUri) + + + def contains(self, triple): + func = self._getCurrentFunc() + self._watchers.addTripleWatcher(func, triple) + return triple in self._graph + + def contextsForStatement(self, triple): + """currently this needs to be in an addHandler section, but it + sets no watchers so it won't actually update if the statement + was added or dropped from contexts""" + func = self._getCurrentFunc() + return contextsForStatementNoWildcards(self._graph, triple) + + # i find myself wanting 'patch' (aka enter/leave) versions of these calls that tell + # you only what results have just appeared or disappeared. I think + # I'm going to be repeating that logic a lot. Maybe just for the + # subjects(RDF.type, t) call + + +class _GraphWatchers(object): + """ + store the current handlers that care about graph changes + """ + def __init__(self): + self._handlersSp = {} # (s,p): set(handlers) + self._handlersPo = {} # (p,o): set(handlers) + self._handlersSpo = {} # (s,p,o): set(handlers) + self._handlersS = {} # s: set(handlers) + + def addSubjPredWatcher(self, func, s, p): + if func is None: + return + key = s, p + try: + self._handlersSp.setdefault(key, set()).add(func) + except Exception: + log.error("with key %r and func %r" % (key, func)) + raise + + def addPredObjWatcher(self, func, p, o): + self._handlersPo.setdefault((p, o), set()).add(func) + + def addTripleWatcher(self, func, triple): + self._handlersSpo.setdefault(triple, set()).add(func) + + def addSubjectWatcher(self, func, s): + self._handlersS.setdefault(s, set()).add(func) + + def whoCares(self, patch): + """what handler functions would care about the changes in this patch? + + this removes the handlers that it gives you + """ + #self.dependencies() + ret = set() + affectedSubjPreds = set([(s, p) for s, p, o, c in patch.addQuads]+ + [(s, p) for s, p, o, c in patch.delQuads]) + for (s, p), funcs in self._handlersSp.iteritems(): + if (s, p) in affectedSubjPreds: + ret.update(funcs) + funcs.clear() + + affectedPredObjs = set([(p, o) for s, p, o, c in patch.addQuads]+ + [(p, o) for s, p, o, c in patch.delQuads]) + for (p, o), funcs in self._handlersPo.iteritems(): + if (p, o) in affectedPredObjs: + ret.update(funcs) + funcs.clear() + + affectedTriples = set([(s, p, o) for s, p, o, c in patch.addQuads]+ + [(s, p, o) for s, p, o, c in patch.delQuads]) + for triple, funcs in self._handlersSpo.iteritems(): + if triple in affectedTriples: + ret.update(funcs) + funcs.clear() + + affectedSubjs = set([s for s, p, o, c in patch.addQuads]+ + [s for s, p, o, c in patch.delQuads]) + for subj, funcs in self._handlersS.iteritems(): + if subj in affectedSubjs: + ret.update(funcs) + funcs.clear() + + return ret + + def dependencies(self): + """ + for debugging, make a list of all the active handlers and what + data they depend on. This is meant for showing on the web ui + for browsing. + """ + log.info("whocares:") + from pprint import pprint + pprint(self._handlersSp)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/clientsession.py Sun Feb 18 08:06:43 2018 +0000 @@ -0,0 +1,17 @@ +""" +some clients will support the concept of a named session that keeps +multiple instances of that client separate +""" +from rdflib import URIRef +from urllib import quote +from light9 import showconfig + +def add_option(parser): + parser.add_option( + '-s', '--session', + help="name of session used for levels and window position", + default='default') + +def getUri(appName, opts): + return URIRef("%s/sessions/%s/%s" % (showconfig.showUri(), appName, + quote(opts.session, safe='')))
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/currentstategraphapi.py Sun Feb 18 08:06:43 2018 +0000 @@ -0,0 +1,87 @@ +import logging, traceback, time, itertools +from rdflib import ConjunctiveGraph +from light9.rdfdb.rdflibpatch import contextsForStatement as rp_contextsForStatement +log = logging.getLogger("currentstate") + +class ReadOnlyConjunctiveGraph(object): + """similar to rdflib's ReadOnlyGraphAggregate but takes one CJ in, instead + of a bunch of Graphs""" + def __init__(self, graph): + self.graph = graph + + def __getattr__(self, attr): + if attr in ['subjects', 'value', 'objects', 'triples', 'label']: # not complete + return getattr(self.graph, attr) + raise TypeError("can't access %r of read-only graph" % attr) + + def __len__(self): + return len(self.graph) + + +class CurrentStateGraphApi(object): + """ + mixin for SyncedGraph, separated here because these methods work together + """ + + def currentState(self, context=None, tripleFilter=(None, None, None)): + """ + a graph you can read without being in an addHandler + + you can save some time by passing a triple filter, and we'll only give you the matching triples + """ + if context is not None: + raise NotImplementedError("currentState with context arg") + + class Mgr(object): + def __enter__(self2): + # this should be a readonly view of the existing + # graph, maybe with something to guard against + # writes/patches happening while reads are being + # done. Typical usage will do some reads on this graph + # before moving on to writes. + + if 1: + g = ReadOnlyConjunctiveGraph(self._graph) + else: + t1 = time.time() + g = ConjunctiveGraph() + for s,p,o,c in self._graph.quads(tripleFilter): + g.store.add((s,p,o), c) + + if tripleFilter == (None, None, None): + self2.logThisCopy(g, time.time() - t1) + + g.contextsForStatement = lambda t: contextsForStatementNoWildcards(g, t) + return g + + def logThisCopy(self, g, sec): + log.info("copied graph %s statements (%.1f ms) " + "because of this:" % (len(g), sec * 1000)) + for frame in traceback.format_stack(limit=4)[:-2]: + for line in frame.splitlines(): + log.info(" "+line) + + def __exit__(self, type, val, tb): + return + + return Mgr() + + def sequentialUri(self, prefix): + """ + Prefix URIRef like http://example.com/r- will return + http://example.com/r-1 if that uri is not a subject in the graph, + or else http://example.com/r-2, etc + """ + for i in itertools.count(1): + newUri = prefix + str(i) + if not list(self._graph.triples((newUri, None, None))) and newUri not in getattr(self, '_reservedSequentials', []): + if not hasattr(self, '_reservedSequentials'): + self._reservedSequentials = set() + self._reservedSequentials.add(newUri) + return newUri + + +def contextsForStatementNoWildcards(g, triple): + if None in triple: + raise NotImplementedError("no wildcards") + return rp_contextsForStatement(g, triple)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/file_vs_uri.py Sun Feb 18 08:06:43 2018 +0000 @@ -0,0 +1,33 @@ +""" +note that if there are ambiguities in dirUriMap, you'll get +undefined results that depend on python's items() order. +""" +import os +from rdflib import URIRef + +def uriFromFile(dirUriMap, filename): + assert filename.endswith('.n3'), filename + for d, prefix in dirUriMap.items(): + if filename.startswith(d): + return URIRef(prefix + filename[len(d):-len('.n3')]) + raise ValueError("filename %s doesn't start with any of %s" % + (filename, dirUriMap.keys())) + +def fileForUri(dirUriMap, ctx): + assert isinstance(ctx, URIRef), ctx + for d, prefix in dirUriMap.items(): + if ctx.startswith(prefix): + return d + ctx[len(prefix):] + '.n3' + raise ValueError("don't know what filename to use for %s" % ctx) + +def correctToTopdirPrefix(dirUriMap, inFile): + if not any(inFile.startswith(prefix) for prefix in dirUriMap): + for prefix in dirUriMap: + prefixAbs = os.path.abspath(prefix) + if inFile.startswith(prefixAbs): + inFile = prefix + inFile[len(prefixAbs):] + break + else: + raise ValueError("can't correct %s to start with one of %s" % + (inFile, dirUriMap.keys())) + return inFile
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/grapheditapi.py Sun Feb 18 08:06:43 2018 +0000 @@ -0,0 +1,119 @@ +import random, logging +from itertools import chain +from rdflib import URIRef, RDF +from light9.rdfdb.patch import Patch, quadsWithContextUris +log = logging.getLogger('graphedit') + +class GraphEditApi(object): + """ + fancier graph edits + + mixin for SyncedGraph, separated here because these methods work together + """ + + def getObjectPatch(self, context, subject, predicate, newObject): + """send a patch which removes existing values for (s,p,*,c) + and adds (s,p,newObject,c). Values in other graphs are not affected. + + newObject can be None, which will remove all (subj,pred,*) statements. + """ + + existing = [] + for spo in self._graph.triples((subject, predicate, None), + context=context): + existing.append(spo+(context,)) + # what layer is supposed to cull out no-op changes? + return Patch( + delQuads=existing, + addQuads=([(subject, predicate, newObject, context)] + if newObject is not None else [])) + + def patchObject(self, context, subject, predicate, newObject): + p = self.getObjectPatch(context, subject, predicate, newObject) + log.info("patchObject %r" % p.jsonRepr) + self.patch(p) + + def patchSubgraph(self, context, newGraph): + """ + replace all statements in 'context' with the quads in newGraph. + This is not cooperating with currentState. + """ + old = set(quadsWithContextUris(self._graph.quads((None, None, None, context)))) + new = set(quadsWithContextUris(newGraph)) + p = Patch(delQuads=old - new, addQuads=new - old) + self.patch(p) + return p # for debugging + + def patchMapping(self, context, subject, predicate, nodeClass, keyPred, valuePred, newKey, newValue): + """ + creates/updates a structure like this: + + ?subject ?predicate [ + a ?nodeClass; + ?keyPred ?newKey; + ?valuePred ?newValue ] . + + There should be a complementary readMapping that gets you a + value since that's tricky too + """ + + # as long as currentState is expensive and has the + # tripleFilter optimization, this looks like a mess. If + # currentState became cheap, a lot of code here could go away. + + with self.currentState(tripleFilter=(subject, predicate, None)) as current: + adds = set([]) + for setting in current.objects(subject, predicate): + with self.currentState(tripleFilter=(setting, keyPred, None)) as current2: + + match = current2.value(setting, keyPred) == newKey + if match: + break + else: + setting = URIRef(subject + "/map/%s" % + random.randrange(999999999)) + adds.update([ + (subject, predicate, setting, context), + (setting, RDF.type, nodeClass, context), + (setting, keyPred, newKey, context), + ]) + + with self.currentState(tripleFilter=(setting, valuePred, None)) as current: + dels = set([]) + for prev in current.objects(setting, valuePred): + dels.add((setting, valuePred, prev, context)) + adds.add((setting, valuePred, newValue, context)) + + if adds != dels: + self.patch(Patch(delQuads=dels, addQuads=adds)) + + def removeMappingNode(self, context, node): + """ + removes the statements with this node as subject or object, which + is the right amount of statements to remove a node that + patchMapping made. + """ + p = Patch(delQuads=[spo+(context,) for spo in + chain(self._graph.triples((None, None, node), + context=context), + self._graph.triples((node, None, None), + context=context))]) + self.patch(p) + +import unittest +from rdflib import ConjunctiveGraph +class TestPatchSubgraph(unittest.TestCase): + def testCollapsesIdenticalQuads(self): + appliedPatches = [] + class Obj(GraphEditApi): + def patch(self, p): + appliedPatches.append(p) + obj = Obj() + obj._graph = ConjunctiveGraph() + stmt1 = (URIRef('s'), URIRef('p'), URIRef('o'), URIRef('g')) + obj._graph.addN([stmt1]) + obj.patchSubgraph(URIRef('g'), [stmt1]) + self.assertEqual(len(appliedPatches), 1) + p = appliedPatches[0] + self.assert_(p.isNoop()) + self.assertEqual(p.jsonRepr, '{"patch": {"adds": "", "deletes": ""}}')
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/graphfile.py Sun Feb 18 08:06:43 2018 +0000 @@ -0,0 +1,252 @@ +import logging, traceback, os, time +from twisted.python.filepath import FilePath +from twisted.internet import reactor +from twisted.internet.inotify import humanReadableMask +from rdflib import Graph, RDF +from light9.rdfdb.patch import Patch +from light9.rdfdb.rdflibpatch import inContext + +log = logging.getLogger('graphfile') +iolog = logging.getLogger('io') + +def patchN3SerializerToUseLessWhitespace(cutColumn=65): + # todo: make a n3serializer subclass with whitespace settings + from rdflib.plugins.serializers.turtle import TurtleSerializer, OBJECT + originalWrite = TurtleSerializer.write + def write(self, s): + lines = s.split('\n') + if len(lines) > 1: + self._column = len(lines[-1]) + else: + self._column += len(lines[0]) + return originalWrite(self, s) + TurtleSerializer.write = write + def predicateList(self, subject, newline=False): + properties = self.buildPredicateHash(subject) + propList = self.sortProperties(properties) + if len(propList) == 0: + return + self.verb(propList[0], newline=newline) + self.objectList(properties[propList[0]]) + for predicate in propList[1:]: + self.write(';') + # can't do proper wrapping since we don't know how much is coming + if self._column > cutColumn: + self.write('\n' + self.indent(1)) + self.verb(predicate, newline=False) + self.objectList(properties[predicate]) + def objectList(self, objects): + count = len(objects) + if count == 0: + return + depthmod = (count == 1) and 0 or 1 + self.depth += depthmod + self.path(objects[0], OBJECT) + for obj in objects[1:]: + self.write(', ') + self.path(obj, OBJECT, newline=True) + self.depth -= depthmod + + originalStatement = TurtleSerializer.statement + def statement(self, subject): + if list(self.store.triples((subject, RDF.type, None))): + self.write('\n') + originalStatement(self, subject) + return False # suppress blank line for 'minor' statements + TurtleSerializer.statement = statement + TurtleSerializer.predicateList = predicateList + TurtleSerializer.objectList = objectList + +patchN3SerializerToUseLessWhitespace() + +class GraphFile(object): + """ + one rdf file that we read from, write to, and notice external changes to + """ + def __init__(self, notifier, path, uri, patch, getSubgraph, globalPrefixes, ctxPrefixes): + """ + uri is the context for the triples in this file. We assume + sometimes that we're the only ones with triples in this + context. + + this does not include an initial reread() call + + Prefixes are mutable dicts. The caller may add to them later. + """ + self.path, self.uri = path, uri + self.patch, self.getSubgraph = patch, getSubgraph + + self.lastWriteTimestamp = 0 # mtime from the last time _we_ wrote + + self.globalPrefixes = globalPrefixes + self.ctxPrefixes = ctxPrefixes + self.readPrefixes = {} + + if not os.path.exists(path): + # can't start notify until file exists + try: + os.makedirs(os.path.dirname(path)) + except OSError: + pass + f = open(path, "w") + f.write("#new\n") + f.close() + iolog.info("%s created", path) + # this was supposed to cut out some extra reads but it + # didn't work: + self.lastWriteTimestamp = os.path.getmtime(path) + + + self.flushDelay = 2 # seconds until we have to call flush() when dirty + self.writeCall = None # or DelayedCall + + self.notifier = notifier + self.addWatch() + + def addWatch(self): + + # emacs save comes in as IN_MOVE_SELF, maybe + + # I was hoping not to watch IN_CHANGED and get lots of + # half-written files, but emacs doesn't close its files after + # a write, so there's no other event. I could try to sleep + # until after all the writes are done, but I think the only + # bug left is that we'll retry too agressively on a file + # that's being written + + from twisted.internet.inotify import IN_CLOSE_WRITE, IN_MOVED_FROM, IN_MODIFY, IN_DELETE, IN_DELETE_SELF, IN_CHANGED + + log.info("add watch on %s", self.path) + self.notifier.watch(FilePath(self.path), callbacks=[self.notify]) + + def notify(self, notifier, filepath, mask): + try: + maskNames = humanReadableMask(mask) + if maskNames[0] == 'delete_self': + if not filepath.exists(): + log.info("%s delete_self", filepath) + self.fileGone() + return + else: + log.warn("%s delete_self event but file is here. " + "probably a new version moved in", + filepath) + + # we could filter these out in the watch() call, but I want + # the debugging + if maskNames[0] in ['open', 'access', 'close_nowrite', 'attrib']: + log.debug("%s %s event, ignoring" % (filepath, maskNames)) + return + + try: + if filepath.getModificationTime() == self.lastWriteTimestamp: + log.debug("%s changed, but we did this write", filepath) + return + except OSError as e: + log.error("%s: %r" % (filepath, e)) + # getting OSError no such file, followed by no future reads + reactor.callLater(.5, self.addWatch) # ? + + return + + log.info("reread %s because of %s event", filepath, maskNames) + + self.reread() + except Exception: + traceback.print_exc() + + def fileGone(self): + """ + our file is gone; remove the statements from that context + """ + myQuads = [(s,p,o,self.uri) for s,p,o in self.getSubgraph(self.uri)] + log.debug("dropping all statements from context %s", self.uri) + if myQuads: + self.patch(Patch(delQuads=myQuads), dueToFileChange=True) + + def reread(self): + """update the graph with any diffs from this file + + n3 parser fails on "1.e+0" even though rdflib was emitting that itself + """ + old = self.getSubgraph(self.uri) + new = Graph() + try: + contents = open(self.path).read() + if contents.startswith("#new"): + log.debug("%s ignoring empty contents of my new file", self.path) + # this is a new file we're starting, and we should not + # patch our graph as if it had just been cleared. We + # shouldn't even be here reading this, but + # lastWriteTimestamp didn't work. + return + + new.parse(location=self.path, format='n3') + self.readPrefixes = dict(new.namespaces()) + except SyntaxError as e: + print e + traceback.print_exc() + log.error("%s syntax error", self.path) + # todo: likely bug- if a file has this error upon first + # read, I think we don't retry it right. + return + except IOError as e: + log.error("%s rereading %s: %r", self.path, self.uri, e) + return + + old = inContext(old, self.uri) + new = inContext(new, self.uri) + + p = Patch.fromDiff(old, new) + if p: + log.debug("%s applying patch for changes in file", self.path) + self.patch(p, dueToFileChange=True) + else: + log.debug("old == new after reread of %s", self.path) + + def dirty(self, graph): + """ + there are new contents to write to our file + + graph is the rdflib.Graph that contains the contents of the + file. It is allowed to change. Note that dirty() will probably + do the save later when the graph might be different. + + after a timer has passed, write it out. Any scheduling issues + between files? i don't think so. the timer might be kind of + huge, and then we might want to take a hint from a client that + it's a good time to save the files that it was editing, like + when the mouse moves out of the client's window and might be + going towards a text file editor + + """ + log.info("%s dirty, needs write", self.path) + + self.graphToWrite = graph + if self.writeCall: + self.writeCall.reset(self.flushDelay) + else: + self.writeCall = reactor.callLater(self.flushDelay, self.flush) + + def flush(self): + self.writeCall = None + + tmpOut = self.path + ".rdfdb-temp" + f = open(tmpOut, 'w') + t1 = time.time() + for p, n in (self.globalPrefixes.items() + + self.readPrefixes.items() + + self.ctxPrefixes.items()): + self.graphToWrite.bind(p, n) + self.graphToWrite.serialize(destination=f, format='n3') + serializeTime = time.time() - t1 + f.close() + self.lastWriteTimestamp = os.path.getmtime(tmpOut) + os.rename(tmpOut, self.path) + iolog.info("%s rewrote in %.1f ms", + self.path, serializeTime * 1000) + + def __repr__(self): + return "%s(path=%r, uri=%r, ...)" % ( + self.__class__.__name__, self.path, self.uri) +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/graphfile_test.py Sun Feb 18 08:06:43 2018 +0000 @@ -0,0 +1,37 @@ +import unittest +import mock +import tempfile +from rdflib import URIRef, Graph +from light9.rdfdb.graphfile import GraphFile + +class TestGraphFileOutput(unittest.TestCase): + def testMaintainsN3PrefixesFromInput(self): + tf = tempfile.NamedTemporaryFile(suffix='_test.n3') + tf.write(''' + @prefix : <http://example.com/> . + @prefix n: <http://example.com/n/> . + :foo n:bar :baz . + ''') + tf.flush() + + def getSubgraph(uri): + return Graph() + gf = GraphFile(mock.Mock(), tf.name, URIRef('uri'), mock.Mock(), getSubgraph, {}, {}) + gf.reread() + + newGraph = Graph() + newGraph.add((URIRef('http://example.com/boo'), + URIRef('http://example.com/n/two'), + URIRef('http://example.com/other/ns'))) + gf.dirty(newGraph) + gf.flush() + wroteContent = open(tf.name).read() + self.assertEqual('''@prefix : <http://example.com/> . +@prefix n: <http://example.com/n/> . +@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . +@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> . +@prefix xml: <http://www.w3.org/XML/1998/namespace> . +@prefix xsd: <http://www.w3.org/2001/XMLSchema#> . + +:boo n:two <http://example.com/other/ns> . +''', wroteContent)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/localsyncedgraph.py Sun Feb 18 08:06:43 2018 +0000 @@ -0,0 +1,21 @@ +from rdflib import ConjunctiveGraph + +from light9.rdfdb.currentstategraphapi import CurrentStateGraphApi +from light9.rdfdb.autodepgraphapi import AutoDepGraphApi +from light9.rdfdb.grapheditapi import GraphEditApi +from light9.rdfdb.rdflibpatch import patchQuads + +class LocalSyncedGraph(CurrentStateGraphApi, AutoDepGraphApi, GraphEditApi): + """for tests""" + def __init__(self, files=None): + self._graph = ConjunctiveGraph() + for f in files or []: + self._graph.parse(f, format='n3') + + + def patch(self, p): + patchQuads(self._graph, + deleteQuads=p.delQuads, + addQuads=p.addQuads, + perfect=True) + # no deps
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/mock_syncedgraph.py Sun Feb 18 08:06:43 2018 +0000 @@ -0,0 +1,50 @@ + +from rdflib import Graph, RDF, RDFS +from rdflib.parser import StringInputSource + +class MockSyncedGraph(object): + """ + Lets users of SyncedGraph mostly work. Doesn't yet help with any + testing of the rerun-upon-graph-change behavior. + """ + def __init__(self, n3Content): + self._graph = Graph() + self._graph.parse(StringInputSource(n3Content), format='n3') + + def addHandler(self, func): + func() + + def value(self, subject=None, predicate=RDF.value, object=None, + default=None, any=True): + if object is not None: + raise NotImplementedError() + return self._graph.value(subject, predicate, object=object, + default=default, any=any) + + def objects(self, subject=None, predicate=None): + return self._graph.objects(subject, predicate) + + def label(self, uri): + return self.value(uri, RDFS.label) + + def subjects(self, predicate=None, object=None): + return self._graph.subjects(predicate, object) + + def predicate_objects(self, subject): + return self._graph.predicate_objects(subject) + + def items(self, listUri): + """generator. Having a chain of watchers on the results is not + well-tested yet""" + chain = set([listUri]) + while listUri: + item = self.value(listUri, RDF.first) + if item: + yield item + listUri = self.value(listUri, RDF.rest) + if listUri in chain: + raise ValueError("List contains a recursive rdf:rest reference") + chain.add(listUri) + + def contains(self, triple): + return triple in self._graph
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/patch.py Sun Feb 18 08:06:43 2018 +0000 @@ -0,0 +1,222 @@ +import json, unittest +from rdflib import ConjunctiveGraph, Graph, URIRef, URIRef as U, Literal +from light9.namespaces import XSD +from light9.rdfdb.rdflibpatch import graphFromNQuad, graphFromQuads, serializeQuad + +ALLSTMTS = (None, None, None) + +def quadsWithContextUris(quads): + """ + yield the given quads, correcting any context values that are + Graphs into URIRefs + """ + if isinstance(quads, ConjunctiveGraph): + quads = quads.quads(ALLSTMTS) + for s,p,o,c in quads: + if isinstance(c, Graph): + c = c.identifier + if not isinstance(c, URIRef): + raise TypeError("bad quad context type in %r" % ((s,p,o,c),)) + yield s,p,o,c + +class Patch(object): + """ + immutable + + the json representation includes the {"patch":...} wrapper + """ + def __init__(self, jsonRepr=None, addQuads=None, delQuads=None, + addGraph=None, delGraph=None): + """ + addQuads/delQuads can be lists or sets, but if we make them internally, + they'll be lists + + 4th element of a quad must be a URIRef + """ + self._jsonRepr = jsonRepr + self._addQuads, self._delQuads = addQuads, delQuads + self._addGraph, self._delGraph = addGraph, delGraph + + if self._jsonRepr is not None: + body = json.loads(self._jsonRepr) + self._delGraph = graphFromNQuad(body['patch']['deletes']) + self._addGraph = graphFromNQuad(body['patch']['adds']) + if 'senderUpdateUri' in body: + self.senderUpdateUri = body['senderUpdateUri'] + + def __str__(self): + def shorten(n): + if isinstance(n, Literal): + if n.datatype == XSD['double']: + return str(n.toPython()) + if isinstance(n, URIRef): + for long, short in [ + ("http://light9.bigasterisk.com/", "l9"), + + ]: + if n.startswith(long): + return short+":"+n[len(long):] + return n.n3() + def formatQuad(quad): + return " ".join(shorten(n) for n in quad) + delLines = [" -%s" % formatQuad(q) for q in self.delQuads] + addLines = [" +%s" % formatQuad(q) for q in self.addQuads] + return "\nPatch:\n" + "\n".join(delLines) + "\n" + "\n".join(addLines) + + def shortSummary(self): + return "[-%s +%s]" % (len(self.delQuads), len(self.addQuads)) + + @classmethod + def fromDiff(cls, oldGraph, newGraph): + """ + make a patch that changes oldGraph to newGraph + """ + old = set(quadsWithContextUris(oldGraph)) + new = set(quadsWithContextUris(newGraph)) + return cls(addQuads=list(new - old), delQuads=list(old - new)) + + def __nonzero__(self): + """ + does this patch do anything to a graph? + """ + if self._jsonRepr and self._jsonRepr.strip(): + raise NotImplementedError() + return bool(self._addQuads or self._delQuads or + self._addGraph or self._delGraph) + + @property + def addQuads(self): + if self._addQuads is None: + if self._addGraph is None: + return [] + self._addQuads = list(quadsWithContextUris( + self._addGraph.quads(ALLSTMTS))) + return self._addQuads + + @property + def delQuads(self): + if self._delQuads is None: + if self._delGraph is None: + return [] + self._delQuads = list(quadsWithContextUris( + self._delGraph.quads(ALLSTMTS))) + return self._delQuads + + @property + def addGraph(self): + if self._addGraph is None: + self._addGraph = graphFromQuads(self.addQuads) + return self._addGraph + + @property + def delGraph(self): + if self._delGraph is None: + self._delGraph = graphFromQuads(self.delQuads) + return self._delGraph + + @property + def jsonRepr(self): + if self._jsonRepr is None: + self._jsonRepr = self.makeJsonRepr() + return self._jsonRepr + + def makeJsonRepr(self, extraAttrs={}): + d = {"patch" : { + 'adds' : serializeQuad(self.addGraph), + 'deletes' : serializeQuad(self.delGraph), + }} + if len(self.addGraph) > 0 and d['patch']['adds'].strip() == "": + # this is the bug that graphFromNQuad works around + raise ValueError("nquads serialization failure") + if '[<' in d['patch']['adds']: + raise ValueError("[< found in %s" % d['patch']['adds']) + d.update(extraAttrs) + return json.dumps(d) + + def concat(self, more): + """ + new Patch with the result of applying this patch and the + sequence of other Patches + """ + # not working yet + adds = set(self.addQuads) + dels = set(self.delQuads) + for p2 in more: + for q in p2.delQuads: + if q in adds: + adds.remove(q) + else: + dels.add(q) + for q in p2.addQuads: + if q in dels: + dels.remove(q) + else: + adds.add(q) + return Patch(delQuads=dels, addQuads=adds) + + def getContext(self): + """assumes that all the edits are on the same context""" + ctx = None + for q in self.addQuads + self.delQuads: + if ctx is None: + ctx = q[3] + + if ctx != q[3]: + raise ValueError("patch applies to multiple contexts, at least %r and %r" % (ctx, q[3])) + if ctx is None: + raise ValueError("patch affects no contexts") + assert isinstance(ctx, URIRef), ctx + return ctx + + def isNoop(self): + return set(self.addQuads) == set(self.delQuads) + + +stmt1 = U('http://a'), U('http://b'), U('http://c'), U('http://ctx1') + +class TestPatchFromDiff(unittest.TestCase): + def testEmpty(self): + g = ConjunctiveGraph() + p = Patch.fromDiff(g, g) + self.assert_(not p) + + def testNonEmpty(self): + g1 = ConjunctiveGraph() + g2 = graphFromQuads([stmt1]) + p = Patch.fromDiff(g1, g2) + self.assert_(p) + + def testNoticesAdds(self): + g1 = ConjunctiveGraph() + g2 = graphFromQuads([stmt1]) + p = Patch.fromDiff(g1, g2) + self.assertEqual(p.addQuads, [stmt1]) + self.assertEqual(p.delQuads, []) + + def testNoticesDels(self): + g1 = graphFromQuads([stmt1]) + g2 = ConjunctiveGraph() + p = Patch.fromDiff(g1, g2) + self.assertEqual(p.addQuads, []) + self.assertEqual(p.delQuads, [stmt1]) + + def testQuadSequenceOkInsteadOfGraph(self): + p = Patch.fromDiff([stmt1], ConjunctiveGraph()) + self.assertEqual(p.delQuads, [stmt1]) + p = Patch.fromDiff(ConjunctiveGraph(), [stmt1]) + self.assertEqual(p.addQuads, [stmt1]) + +class TestPatchGetContext(unittest.TestCase): + def testEmptyPatchCantGiveContext(self): + p = Patch() + self.assertRaises(ValueError, p.getContext) + + def testSimplePatchReturnsContext(self): + p = Patch(addQuads=[stmt1]) + self.assertEqual(p.getContext(), U('http://ctx1')) + + def testMultiContextPatchFailsToReturnContext(self): + p = Patch(addQuads=[stmt1[:3] + (U('http://ctx1'),), + stmt1[:3] + (U('http://ctx2'),)]) + self.assertRaises(ValueError, p.getContext) +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/patchreceiver.py Sun Feb 18 08:06:43 2018 +0000 @@ -0,0 +1,65 @@ +import logging, cyclone.httpclient, traceback, urllib +from twisted.internet import reactor +from light9 import networking +from light9.rdfdb.patch import Patch +log = logging.getLogger('syncedgraph') + +class PatchReceiver(object): + """ + runs a web server in this process and registers it with the rdfdb + master. See onPatch for what happens when the rdfdb master sends + us a patch + """ + def __init__(self, rdfdbRoot, label, onPatch): + """ + label is what we'll call ourselves to the rdfdb server + + onPatch is what we call back when the server sends a patch + """ + self.rdfdbRoot = rdfdbRoot + listen = reactor.listenTCP(0, cyclone.web.Application(handlers=[ + (r'/update', makePatchEndpoint(onPatch)), + ])) + port = listen._realPortNumber # what's the right call for this? + + self.updateResource = 'http://%s:%s/update' % ( + networking.patchReceiverUpdateHost.value, port) + log.info("listening on %s" % port) + self._register(label) + + def _register(self, label): + url = self.rdfdbRoot + 'graphClients' + body = urllib.urlencode([('clientUpdate', self.updateResource), + ('label', label)]) + cyclone.httpclient.fetch( + url=url, + method='POST', + headers={'Content-Type': ['application/x-www-form-urlencoded']}, + postdata=body, + ).addCallbacks(self._done, + lambda err: self._registerError(err, url, body)) + log.info("registering with rdfdb at %s", url) + + def _registerError(self, err, url, body): + log.error('registering to url=%r body=%r', url, body) + log.error(err) + + def _done(self, x): + log.debug("registered with rdfdb") + + +def makePatchEndpointPutMethod(cb): + def put(self): + try: + p = Patch(jsonRepr=self.request.body) + log.debug("received patch -%d +%d" % (len(p.delGraph), len(p.addGraph))) + cb(p) + except: + traceback.print_exc() + raise + return put + +def makePatchEndpoint(cb): + class Update(cyclone.web.RequestHandler): + put = makePatchEndpointPutMethod(cb) + return Update
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/patchsender.py Sun Feb 18 08:06:43 2018 +0000 @@ -0,0 +1,120 @@ +import logging, time +import cyclone.httpclient +from twisted.internet import defer +log = logging.getLogger('syncedgraph') + +class PatchSender(object): + """ + SyncedGraph may generate patches faster than we can send + them. This object buffers and may even collapse patches before + they go the server + """ + def __init__(self, target, myUpdateResource): + """ + target is the URI we'll send patches to + + myUpdateResource is the URI for this sender of patches, which + maybe needs to be the actual requestable update URI for + sending updates back to us + """ + self.target = target + self.myUpdateResource = myUpdateResource + self._patchesToSend = [] + self._currentSendPatchRequest = None + + def sendPatch(self, p): + sendResult = defer.Deferred() + self._patchesToSend.append((p, sendResult)) + self._continueSending() + return sendResult + + def cancelAll(self): + self._patchesToSend[:] = [] + # we might be in the middle of a post; ideally that would be + # aborted, too. Since that's not coded yet, or it might be too late to + # abort, what should happen? + # 1. this could return deferred until we think our posts have stopped + # 2. or, other code could deal for the fact that cancelAll + # isn't perfect + + def _continueSending(self): + if not self._patchesToSend or self._currentSendPatchRequest: + return + if len(self._patchesToSend) > 1: + log.debug("%s patches left to send", len(self._patchesToSend)) + # this is where we could concatenate little patches into a + # bigger one. Often, many statements will cancel each + # other out. not working yet: + if 0: + p = self._patchesToSend[0].concat(self._patchesToSend[1:]) + print "concat down to" + print 'dels' + for q in p.delQuads: print q + print 'adds' + for q in p.addQuads: print q + print "----" + else: + p, sendResult = self._patchesToSend.pop(0) + else: + p, sendResult = self._patchesToSend.pop(0) + + self._currentSendPatchRequest = sendPatch( + self.target, p, senderUpdateUri=self.myUpdateResource) + self._currentSendPatchRequest.addCallbacks(self._sendPatchDone, + self._sendPatchErr) + self._currentSendPatchRequest.chainDeferred(sendResult) + + def _sendPatchDone(self, result): + self._currentSendPatchRequest = None + self._continueSending() + + def _sendPatchErr(self, e): + self._currentSendPatchRequest = None + # we're probably out of sync with the master now, since + # SyncedGraph.patch optimistically applied the patch to our + # local graph already. What happens to this patch? What + # happens to further pending patches? Some of the further + # patches, especially, may be commutable with the bad one and + # might still make sense to apply to the master graph. + + # if someday we are folding pending patches together, this + # would be the time to UNDO that and attempt the original + # separate patches again + + # this should screen for 409 conflict responses and raise a + # special exception for that, so SyncedGraph.sendFailed can + # screen for only that type + + # this code is going away; we're going to raise an exception that contains all the pending patches + log.error("_sendPatchErr") + log.error(e) + self._continueSending() + +def sendPatch(putUri, patch, **kw): + """ + PUT a patch as json to an http server. Returns deferred. + + kwargs will become extra attributes in the toplevel json object + """ + t1 = time.time() + body = patch.makeJsonRepr(kw) + jsonTime = time.time() - t1 + intro = body[:200] + if len(body) > 200: + intro = intro + "..." + log.debug("send body (rendered %.1fkB in %.1fms): %s", len(body) / 1024, jsonTime * 1000, intro) + sendTime = time.time() + def putDone(done): + if not str(done.code).startswith('2'): + raise ValueError("sendPatch request failed %s: %s" % + (done.code, done.body)) + dt = 1000 * (time.time() - sendTime) + log.debug("sendPatch to %s took %.1fms" % (putUri, dt)) + return done + + return cyclone.httpclient.fetch( + url=putUri, + method='PUT', + headers={'Content-Type': ['application/json']}, + postdata=body, + ).addCallback(putDone)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/rdflibpatch.py Sun Feb 18 08:06:43 2018 +0000 @@ -0,0 +1,201 @@ +""" +this is a proposal for a ConjunctiveGraph method in rdflib +""" +import sys +if sys.path[0] == '/usr/lib/python2.7/dist-packages': + # nosetests puts this in + sys.path = sys.path[1:] + +import unittest +from rdflib import ConjunctiveGraph, Graph, URIRef as U, Literal + +def patchQuads(graph, deleteQuads, addQuads, perfect=False): + """ + Delete the sequence of given quads. Then add the given quads just + like addN would. If perfect is True, we'll error before the + deletes or before the adds (not a real transaction) if any of the + deletes isn't in the graph or if any of the adds was already in + the graph. + + These input quads use URIRef for the context, but + Graph(identifier=) is also allowed (which is what you'll get + sometimes from rdflib APIs). + """ + toDelete = [] + for spoc in deleteQuads: + spoc = fixContextToUri(spoc) + + if perfect: + if inGraph(spoc, graph): + toDelete.append(spoc) + else: + raise ValueError("%r not in %r" % (spoc[:3], spoc[3])) + else: + graph.remove(spoc) + for spoc in toDelete: + graph.remove(spoc) + + if perfect: + addQuads = list(addQuads) + for spoc in addQuads: + spoc = fixContextToUri(spoc) + if inGraph(spoc, graph): + raise ValueError("%r already in %r" % (spoc[:3], spoc[3])) + graph.addN(addQuads) + +def fixContextToUri(spoc): + if not isinstance(spoc[3], U): + return spoc[:3] + (spoc[3].identifier,) + return spoc + +def inGraph(spoc, graph): + """ + c is just a URIRef. + Workaround for https://github.com/RDFLib/rdflib/issues/398 + """ + spoi = spoc[:3] + (Graph(identifier=spoc[3]),) + if spoi not in graph: + # this is a huge speedup, avoid many whole-graph scans + return False + return spoi in graph.quads() + +# some of the following workarounds may be fixed in https://github.com/RDFLib/rdflib/issues/299 +def graphFromQuads(q): + g = ConjunctiveGraph() + #g.addN(q) # no effect on nquad output + for s,p,o,c in q: + #g.get_context(c).add((s,p,o)) # kind of works with broken rdflib nquad serializer code; you need this for json_ld serialize to work :( + g.store.add((s,p,o), c) # no effect on nquad output + return g + +def graphFromNQuad(text): + g1 = ConjunctiveGraph() + # text might omit ctx on some lines. rdflib just puts in a bnode, which shows up later. + g1.parse(data=text, format='nquads') + return g1 + +from rdflib.plugins.serializers.nt import _quoteLiteral +def serializeQuad(g): + """ + replacement for graph.serialize(format='nquads') + + Still broken in rdflib 4.2.2: graph.serialize(format='nquads') + returns empty string for my graph in + TestGraphFromQuads.testSerializes. + """ + out = [] + for s,p,o,c in g.quads((None,None,None)): + if isinstance(c, Graph): + # still not sure why this is Graph sometimes, + # already URIRef other times + c = c.identifier + if '[' in c.n3(): + import ipdb;ipdb.set_trace() + ntObject = _quoteLiteral(o) if isinstance(o, Literal) else o.n3() + out.append(u"%s %s %s %s .\n" % (s.n3(), + p.n3(), + ntObject, + c.n3())) + return ''.join(out) + +def inContext(graph, newContext): + """ + make a ConjunctiveGraph where all the triples in the given graph + (or collection) are now in newContext (a uri) + """ + return graphFromQuads((s,p,o,newContext) for s,p,o in graph) + +def contextsForStatement(graph, triple): + return [q[3] for q in graph.quads(triple)] + + +A = U("http://a"); B = U("http://b") +class TestInContext(unittest.TestCase): + def testResultHasQuads(self): + g = inContext([(A,A,A)], B) + self.assertEqual(list(g.quads())[0], (A,A,A,B)) + +class TestContextsForStatement(unittest.TestCase): + def testNotFound(self): + g = graphFromQuads([(A,A,A,A)]) + self.assertEqual(contextsForStatement(g, (B,B,B)), []) + def testOneContext(self): + g = graphFromQuads([(A,A,A,A), (A,A,B,B)]) + self.assertEqual(contextsForStatement(g, (A,A,A)), [A]) + def testTwoContexts(self): + g = graphFromQuads([(A,A,A,A), (A,A,A,B)]) + self.assertEqual(sorted(contextsForStatement(g, (A,A,A))), sorted([A,B])) + # There's a case where contextsForStatement was returning a Graph + # with identifier, which I've fixed without a test + + +class TestGraphFromQuads(unittest.TestCase): + nqOut = '<http://example.com/> <http://example.com/> <http://example.com/> <http://example.com/> .\n' + def testSerializes(self): + n = U("http://example.com/") + g = graphFromQuads([(n,n,n,n)]) + out = serializeQuad(g) + self.assertEqual(out.strip(), self.nqOut.strip()) + + def testNquadParserSerializes(self): + g = graphFromNQuad(self.nqOut) + self.assertEqual(len(g), 1) + out = serializeQuad(g) + self.assertEqual(out.strip(), self.nqOut.strip()) + + +A = U("http://a"); B = U("http://b"); C = U("http://c") +CTX1 = U('http://ctx1'); CTX2 = U('http://ctx2') +stmt1 = A, B, C, CTX1 +stmt2 = A, B, C, CTX2 +class TestPatchQuads(unittest.TestCase): + def testAddsToNewContext(self): + g = ConjunctiveGraph() + patchQuads(g, [], [stmt1]) + self.assert_(len(g), 1) + quads = list(g.quads((None,None,None))) + self.assertEqual(quads, [(A, B, C, Graph(identifier=CTX1))]) + + def testDeletes(self): + g = ConjunctiveGraph() + patchQuads(g, [], [stmt1]) + patchQuads(g, [stmt1], []) + quads = list(g.quads((None,None,None))) + self.assertEqual(quads, []) + + def testDeleteRunsBeforeAdd(self): + g = ConjunctiveGraph() + patchQuads(g, [stmt1], [stmt1]) + quads = list(g.quads((None,None,None))) + self.assertEqual(quads, [(A, B, C, Graph(identifier=CTX1))]) + + def testPerfectAddRejectsExistingStmt(self): + g = ConjunctiveGraph() + patchQuads(g, [], [stmt1]) + self.assertRaises(ValueError, patchQuads, g, [], [stmt1], perfect=True) + + def testPerfectAddAllowsExistingStmtInNewContext(self): + g = ConjunctiveGraph() + patchQuads(g, [], [stmt1]) + patchQuads(g, [], [stmt2], perfect=True) + self.assertEqual(len(list(g.quads((None,None,None)))), 2) + + def testPerfectDeleteRejectsAbsentStmt(self): + g = ConjunctiveGraph() + self.assertRaises(ValueError, patchQuads, g, [stmt1], [], perfect=True) + + def testPerfectDeleteRejectsStmtFromOtherGraph(self): + g = ConjunctiveGraph() + patchQuads(g, [], [stmt2]) + self.assertRaises(ValueError, patchQuads, g, [stmt1], [], perfect=True) + + def testPerfectDeleteAllowsRemovalOfStmtInMultipleContexts(self): + g = ConjunctiveGraph() + patchQuads(g, [], [stmt1, stmt2]) + patchQuads(g, [stmt1], [], perfect=True) + + def testRedundantStmtOkForAddOrDelete(self): + g = ConjunctiveGraph() + patchQuads(g, [], [stmt1, stmt1], perfect=True) + patchQuads(g, [stmt1, stmt1], [], perfect=True) +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/rdflibpatch_literal.py Sun Feb 18 08:06:43 2018 +0000 @@ -0,0 +1,56 @@ +import sys +if sys.path[0] == '/usr/lib/python2.7/dist-packages': + # nosetests puts this in + sys.path = sys.path[1:] + +from rdflib.term import _PLAIN_LITERAL_TYPES, _XSD_DOUBLE, _XSD_DECIMAL, Literal +from re import sub + +def _literal_n3(self, use_plain=False, qname_callback=None): + if use_plain and self.datatype in _PLAIN_LITERAL_TYPES: + try: + self.toPython() # check validity + # this is a bit of a mess - + # in py >=2.6 the string.format function makes this easier + # we try to produce "pretty" output + if self.datatype == _XSD_DOUBLE: + # this is the drewp fix + return sub(r"\.?0*e","e", u'%e' % float(self)) + elif self.datatype == _XSD_DECIMAL: + return sub("0*$","0",u'%f' % float(self)) + else: + return u'%s' % self + except ValueError: + pass # if it's in, we let it out? + + encoded = self._quote_encode() + + datatype = self.datatype + quoted_dt = None + if datatype: + if qname_callback: + quoted_dt = qname_callback(datatype) + if not quoted_dt: + quoted_dt = "<%s>" % datatype + + language = self.language + if language: + if datatype: + # TODO: this isn't valid RDF (it's datatype XOR language) + return '%s@%s^^%s' % (encoded, language, quoted_dt) + return '%s@%s' % (encoded, language) + elif datatype: + return '%s^^%s' % (encoded, quoted_dt) + else: + return '%s' % encoded + +def patch(): + Literal._literal_n3 = _literal_n3 + +import unittest +class TestDoubleOutput(unittest.TestCase): + def testNoDanglingPoint(self): + vv = Literal("0.88", datatype=_XSD_DOUBLE) + out = _literal_n3(vv, use_plain=True) + print out + self.assert_(out in ["8.8e-01", "0.88"], out)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/syncedgraph.py Sun Feb 18 08:06:43 2018 +0000 @@ -0,0 +1,167 @@ +""" +client code uses a SyncedGraph, which has a few things: + +AutoDepGraphApi - knockoutjs-inspired API for querying the graph in a +way that lets me call you again when there were changes to the things +you queried + +CurrentStateGraphApi - a way to query the graph that doesn't gather +your dependencies like AutoDepGraphApi does + +GraphEditApi - methods to write patches to the graph for common +operations, e.g. replacing a value, or editing a mapping + +PatchReceiver - our web server that listens to edits from the master graph + +PatchSender - collects and transmits your graph edits +""" + +from rdflib import ConjunctiveGraph +import logging, cyclone.httpclient, traceback +from twisted.internet import defer +import treq, json +log = logging.getLogger('syncedgraph') +from light9.rdfdb.rdflibpatch import patchQuads + +from light9.rdfdb.patchsender import PatchSender +from light9.rdfdb.patchreceiver import PatchReceiver +from light9.rdfdb.currentstategraphapi import CurrentStateGraphApi +from light9.rdfdb.autodepgraphapi import AutoDepGraphApi +from light9.rdfdb.grapheditapi import GraphEditApi + +# everybody who writes literals needs to get this +from rdflibpatch_literal import patch +patch() + + +class SyncedGraph(CurrentStateGraphApi, AutoDepGraphApi, GraphEditApi): + """ + graph for clients to use. Changes are synced with the master graph + in the rdfdb process. + + This api is like rdflib.Graph but it can also call you back when + there are graph changes to the parts you previously read. + + You may want to attach to self.initiallySynced deferred so you + don't attempt patches before we've heard the initial contents of + the graph. It would be ok to accumulate some patches of new + material, but usually you won't correctly remove the existing + statements unless we have the correct graph. + + If we get out of sync, we abandon our local graph (even any + pending local changes) and get the data again from the + server. + """ + def __init__(self, rdfdbRoot, label): + """ + label is a string that the server will display in association + with your connection + """ + self.rdfdbRoot = rdfdbRoot + self.initiallySynced = defer.Deferred() + self._graph = ConjunctiveGraph() + + self._receiver = PatchReceiver(self.rdfdbRoot, label, self._onPatch) + + self._sender = PatchSender(self.rdfdbRoot + 'patches', + self._receiver.updateResource) + AutoDepGraphApi.__init__(self) + # this needs more state to track if we're doing a resync (and + # everything has to error or wait) or if we're live + + def resync(self): + """ + get the whole graph again from the server (e.g. we had a + conflict while applying a patch and want to return to the + truth). + + To avoid too much churn, we remember our old graph and diff it + against the replacement. This way, our callers only see the + corrections. + + Edits you make during a resync will surely be lost, so I + should just fail them. There should be a notification back to + UIs who want to show that we're doing a resync. + """ + log.info('resync') + self._sender.cancelAll() + # this should be locked so only one resync goes on at once + return cyclone.httpclient.fetch( + url=self.rdfdbRoot + "graph", + method="GET", + headers={'Accept':['x-trig']}, + ).addCallback(self._resyncGraph) + + def _resyncGraph(self, response): + log.warn("new graph in") + + #diff against old entire graph + #broadcast that change + + def patch(self, p): + """send this patch to the server and apply it to our local + graph and run handlers""" + + if p.isNoop(): + log.info("skipping no-op patch") + return + + # these could fail if we're out of sync. One approach: + # Rerequest the full state from the server, try the patch + # again after that, then give up. + debugKey = '[id=%s]' % (id(p) % 1000) + log.debug("\napply local patch %s %s", debugKey, p) + try: + patchQuads(self._graph, + deleteQuads=p.delQuads, + addQuads=p.addQuads, + perfect=True) + except ValueError as e: + log.error(e) + self.sendFailed(None) + return + log.debug('runDepsOnNewPatch') + self.runDepsOnNewPatch(p) + log.debug('sendPatch') + self._sender.sendPatch(p).addErrback(self.sendFailed) + log.debug('patch is done %s', debugKey) + + def suggestPrefixes(self, ctx, prefixes): + """ + when writing files for this ctx, try to use these n3 + prefixes. async, not guaranteed to finish before any + particular file flush + """ + treq.post(self.rdfdbRoot + 'prefixes', json.dumps({'ctx': ctx, 'prefixes': prefixes})) + + def sendFailed(self, result): + """ + we asked for a patch to be queued and sent to the master, and + that ultimately failed because of a conflict + """ + log.warn("sendFailed") + self.resync() + + #i think we should receive back all the pending patches, + #do a resync here, + #then requeue all the pending patches (minus the failing one?) after that's done. + + def _onPatch(self, p): + """ + central server has sent us a patch + """ + log.debug('_onPatch server has sent us %s', p) + patchQuads(self._graph, p.delQuads, p.addQuads, perfect=True) + log.debug("graph now has %s statements" % len(self._graph)) + try: + self.runDepsOnNewPatch(p) + except Exception: + # don't reflect this error back to the server; we did + # receive its patch correctly. However, we're in a bad + # state since some dependencies may not have rerun + traceback.print_exc() + log.warn("some graph dependencies may not have completely run") + + if self.initiallySynced: + self.initiallySynced.callback(None) + self.initiallySynced = None
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/web/graphView.html Sun Feb 18 08:06:43 2018 +0000 @@ -0,0 +1,42 @@ +<!DOCTYPE html> +<html> + <head> + <title>graphview</title> + <link rel="stylesheet" type="text/css" href="/style.css"> + + </head> + <body> + <!-- + this page is another graph client who maintains the whole + graph all the time + --> + + <div id="status">starting...</div> + + <fieldset> + <legend>Messages</legend> + <div id="out"></div> + </fieldset> + + <p>URI substring: <input type="text" id="uriSubstring"></p> + + <table> + <thead> + <tr> + <th>subject</th> <th>predicate</th> <th>object</th> <th>context</th> + </tr> + </thead> + <tbody> + </tbody> + </table> + + <script type="text/javascript" src="/lib/jquery/dist/jquery.min.js"></script> + <script type="text/javascript" src="/websocket.js"></script> + <script type="text/javascript" src="syncedgraph.js"></script> + <script type="text/javascript"> + $(function(){ + + }); + </script> + </body> +</html>
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/web/index.html Sun Feb 18 08:06:43 2018 +0000 @@ -0,0 +1,58 @@ +<!DOCTYPE html> +<html> + <head> + <title>rdfdb</title> + <link rel="stylesheet" type="text/css" href="/style.css"> + <link rel="stylesheet" type="text/css" href="style.css"> + </head> + <body id="rdfdb"> + <h1>rdfdb</h1> + <div id="status">starting...</div> + + <section id="edits"> + <h2>Edits</h2> + <div id="patches"></div> + </section> + + <p>Clients: <span id="clients"></span></p> + + <section id="messages"> + <h2>Messages</h2> + <div id="out"></div> + </fieldset> + + <script type="text/javascript" src="/lib/jquery/dist/jquery.min.js"></script> + <script type="text/javascript" src="/websocket.js"></script> + <script type="text/javascript"> + $(function(){ + + function collapseCuries(s) { + // this is temporary. The correct thing is to parse the quad (or receive it as a tree) and then make links to the full URIs and display curies of them + + return s + .replace(/<http:\/\/www.w3.org\/2001\/XMLSchema#(.*?)>/g, function (match, short) { return "xsd:"+short; }) + .replace(/<http:\/\/light9.bigasterisk.com\/(.*?)>/g, function (match, short) { return "light9:"+short; }) + .replace(/<http:\/\/light9.bigasterisk.com\/show\/dance2012\/sessions\/(.*?)>/g, function (match, short) { return "kcsession:"+short }); + } + + function onMessage(d) { + if (d.clients !== undefined) { + $("#clients").empty().text(JSON.stringify(d.clients)); + } + if (d.patch !== undefined) { + $("#patches").prepend( + $("<fieldset>").addClass("patch") + .append($("<legend>").text("Patch")) + .append($("<div>").addClass("deletes").text(collapseCuries(d.patch.deletes))) + .append($("<div>").addClass("adds").text(collapseCuries(d.patch.adds))) + ); + } + + $('#out').append($('<div>').text(JSON.stringify(d))); + } + reconnectingWebSocket("live", onMessage); + }); + </script> + + </body> +</html>
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/web/style.css Sun Feb 18 08:06:43 2018 +0000 @@ -0,0 +1,54 @@ +body { + display: flex; + flex-direction: column; + position: absolute; + width: 100%; + height: 100%; + padding: 0px; + margin: 0; +} + +#edits { + flex-grow: 3; + display: flex; + flex-direction: column; +} +#messages { + flex-grow: 1; + display: flex; + flex-direction: column; + margin-bottom: 3px; +} + +/* rdfdb */ +#patches { + overflow-y: scroll; + flex: 1 1 0; +} +.patch { + border: 1px solid gray; + padding: 2px; + margin: 4px; +} +.patch > div { + font-family: monospace; + font-size: 90%; + white-space: pre-wrap; +} +.patch .adds { + color: #3AEA38; +} +.patch .deletes { + color: #FF2828; +} +#out { + white-space: pre-wrap; + overflow-y: auto; + /* height: 50px; */ + flex: 1 0 0; +} +.patch fieldset { + color: gray; + font-family: arial; + font-size: 75%; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rdfdb/web/syncedgraph.js Sun Feb 18 08:06:43 2018 +0000 @@ -0,0 +1,29 @@ +function SyncedGraph(label) { + /* + like python SyncedGraph but talks over a websocket to + rdfdb. This one has an API more conducive to reading and + querying. + + light9/web/graph.coffee is the newer attempt + */ + var self = this; + + + + self.patch = function (p) { + throw; + } + self.nodesWithSubstring = function (subString) { + + } + self.quads = function (s, p, o, c) { + // any args can be null for wildcard + } + + + function onMessage(d) { + $('#out').append($('<div>').text(JSON.stringify(d))); + } + + reconnectingWebSocket("liveSyncedGraph", onMessage); +}