import * as async from "async"; import debug from "debug"; import * as N3 from "n3"; import { NamedNode, Parser, Quad, Writer } from "n3"; import * as Immutable from "immutable"; export interface QuadPattern { subject: N3.Quad_Subject | null; predicate: N3.Quad_Predicate | null; object: N3.Quad_Object | null; // literals allowed? needs review. probably 'yes'. graph: N3.Quad_Graph | null; } const log = debug("patch"); export class Patch { // immutable private dels: Immutable.Set; private adds: Immutable.Set; private _allPredsCache?: Immutable.Set; private _allSubjsCache?: Immutable.Set; constructor(dels: Iterable, adds: Iterable) { this.dels = Immutable.Set(dels); this.adds = Immutable.Set(adds); this.validate(); } private validate() { // todo: finish porting this from coffeescript this.adds.union(this.dels).forEach((q: Quad) => { if (!q.equals) { throw new Error("doesn't look like a proper Quad"); } if (!q.subject.id || q.graph.id == null || q.predicate.id == null) { throw new Error(`corrupt patch: ${JSON.stringify(q)}`); } if ( q.object.termType == "Literal" && (q.object.datatypeString == "http://www.w3.org/2001/XMLSchema#float" || q.object.datatypeString == "http://www.w3.org/2001/XMLSchema#double") ) { throw new Error(`${JSON.stringify(q)} is using non-decimal for numbers, which is going to break some comparisons`); } }); } matches(pat: QuadPattern): boolean { const allQuads = this.dels.concat(this.adds); return allQuads.some((quad) => { return ( (pat.subject === null || pat.subject.equals(quad.subject)) && // (pat.predicate === null || pat.predicate.equals(quad.predicate)) && // (pat.object === null || pat.object.equals(quad.object)) && // (pat.graph === null || pat.graph.equals(quad.graph)) ); }); } isEmpty() { return Immutable.is(this.dels, this.adds); } applyToGraph(g: N3.Store) { for (let quad of this.dels) { g.removeQuad(quad); } for (let quad of this.adds) { g.addQuad(quad); } } update(other: Patch): Patch { // this is approx, since it doesnt handle cancelling existing quads. return new Patch(this.dels.union(other.dels), this.adds.union(other.adds)); } summary(): string { return "-" + this.dels.size + " +" + this.adds.size; } dump(): string { if (this.dels.size + this.adds.size > 20) { return this.summary(); } const lines: string[] = []; const s = (term: N3.Term): string => { if (term.termType == "Literal") return term.value; if (term.termType == "NamedNode") return term.value .replace("http://light9.bigasterisk.com/effect/", "effect:") .replace("http://light9.bigasterisk.com/", ":") .replace("http://www.w3.org/2000/01/rdf-schema#", "rdfs:") .replace("http://www.w3.org/1999/02/22-rdf-syntax-ns#", "rdf:"); if (term.termType == "BlankNode") return "_:" + term.value; return term.id; }; const delPrefix = "- ", addPrefix = "\u200B+ "; // dels to sort before adds this.dels.forEach((d) => lines.push(delPrefix + s(d.subject) + " " + s(d.predicate) + " " + s(d.object))); this.adds.forEach((d) => lines.push(addPrefix + s(d.subject) + " " + s(d.predicate) + " " + s(d.object))); lines.sort(); return lines.join("\n") + "\n" + (this.isEmpty() ? "(empty)" : "(nonempty)"); } async toJsonPatch(): Promise { return new Promise((res, rej) => { const out: SyncgraphPatchMessage = { patch: { adds: "", deletes: "" } }; const writeDels = (cb1: () => void) => { const writer = new Writer({ format: "N-Quads" }); writer.addQuads(this.dels.toArray()); writer.end(function (err: any, result: string) { out.patch.deletes = result; cb1(); }); }; const writeAdds = (cb2: () => void) => { const writer = new Writer({ format: "N-Quads" }); writer.addQuads(this.adds.toArray()); writer.end(function (err: any, result: string) { out.patch.adds = result; cb2(); }); }; async.parallel([writeDels, writeAdds], (err: any) => res(JSON.stringify(out))); }); } containsAnyPreds(preds: Iterable): boolean { if (this._allPredsCache === undefined) { this._allPredsCache = Immutable.Set(); this._allPredsCache.withMutations((cache) => { for (let qq of [this.adds, this.dels]) { for (let q of Array.from(qq)) { cache.add(q.predicate.value); } } }); } for (let p of preds) { if (this._allPredsCache.has(p.value)) { return true; } } return false; } allSubjs(): Immutable.Set { // returns subjs as Set of strings if (this._allSubjsCache === undefined) { this._allSubjsCache = Immutable.Set(); this._allSubjsCache.withMutations((cache) => { for (let qq of [this.adds, this.dels]) { for (let q of Array.from(qq)) { cache.add(q.subject.value); } } }); } return this._allSubjsCache; } allPreds(): Immutable.Set { // todo: this could cache const ret = Immutable.Set(); ret.withMutations((r) => { for (let qq of [this.adds, this.dels]) { for (let q of Array.from(qq)) { if (q.predicate.termType == "Variable") throw "unsupported"; r.add(q.predicate); } } }); return ret; } } // The schema of the json sent from graph server. export interface SyncgraphPatchMessage { patch: { adds: string; deletes: string }; } export function patchToDeleteEntireGraph(g: N3.Store) { return new Patch(g.getQuads(null, null, null, null), []); } export function parseJsonPatch(input: SyncgraphPatchMessage, cb: (p: Patch) => void): void { // note response cb doesn't have an error arg. const dels: Quad[] = []; const adds: Quad[] = []; const parseAdds = (cb2: () => any) => { const parser = new Parser(); return parser.parse(input.patch.adds, (error: any, quad: Quad, prefixes: any) => { if (quad) { return adds.push(quad); } else { return cb2(); } }); }; const parseDels = (cb3: () => any) => { const parser = new Parser(); return parser.parse(input.patch.deletes, (error: any, quad: any, prefixes: any) => { if (quad) { return dels.push(quad); } else { return cb3(); } }); }; // todo: is it faster to run them in series? might be async.parallel([parseAdds, parseDels], (err: any) => cb(new Patch(dels, adds))); }