Mercurial > code > home > repos > streamed-graph
diff src/SourceGraph.ts @ 139:cf642d395be4
new simpler Patch class; fancier 'hide' view config support
author | drewp@bigasterisk.com |
---|---|
date | Mon, 08 May 2023 13:05:20 -0700 |
parents | a8939c717015 |
children |
line wrap: on
line diff
--- a/src/SourceGraph.ts Sat May 06 15:35:11 2023 -0700 +++ b/src/SourceGraph.ts Mon May 08 13:05:20 2023 -0700 @@ -1,7 +1,17 @@ import { JsonLdParser } from "jsonld-streaming-parser"; import { Parser, Store } from "n3"; import { SubEvent } from "sub-events"; -import { Patch } from "./Patch"; +import { Patch, PatchDirection } from "./Patch"; + +type JsonLdData = Object; + +interface CombinedPatchBody { + adds?: JsonLdData; + deletes?: JsonLdData; +} +interface CombinedPatchEvent { + patch: CombinedPatchBody; +} // Possibly-changing graph for one source. Maintains its own dataset. // Read-only. @@ -22,6 +32,7 @@ // async reloadRdf() { const resp = await fetch(this.url); + this.clearStore(); const ctype = resp.headers.get("content-type"); if (ctype?.startsWith("text/event-stream")) { await this.reloadEventStream(); @@ -30,43 +41,85 @@ } } + private clearStore() { + this.store.removeMatches(null, null, null, null); + } + + async makePatchFromParsed( + dir: PatchDirection, + jsonLdObj: Object | undefined + ): Promise<Patch> { + const p = new Patch(dir); + if (jsonLdObj === undefined) { + return p; + } + const parser = new JsonLdParser(); + + parser.write(JSON.stringify(jsonLdObj)); + parser.end(); + + await p.streamImport(parser); + return p; + } + + private applyPatch(p: Patch) { + if (p.isEmpty()){ + return; + } + p.applyToStore(this.store); + if (this.sourceGraphChanged.getStat().unnamed == 0) { + console.warn("no listeners to this sourceGraphChanged event"); + } + this.sourceGraphChanged.emit(p); + } + private async reloadEventStream(): Promise<void> { return new Promise((res, rej) => { - // todo deal with reconnects const ev = new EventSource(this.url); - let firstPatch = true; - // clear store here? - // maybe the event messages should be 'add' and 'del', - // for less parsing and clearer order of ops. - ev.addEventListener("patch", async (ev) => { - // this is reentrant- ok? - - const patchMsg = JSON.parse((ev as any).data); - - const p = new Patch(); - + // old-style fullGraph. I now think it would be better to send plain + // adds and for this SourceGraph to emptyGraph when there's a new connection. + ev.addEventListener("fullGraph", async (ev) => { + this.clearStore(); + const p = new Patch(PatchDirection.ADD); const parser = new JsonLdParser(); - parser.write(JSON.stringify(patchMsg.patch.adds)); + parser.write(ev.data); parser.end(); await p.streamImport(parser); + p.applyToStore(this.store); this.isCurrent = true; - - p.applyToStore(this.store); - console.log("patchlife0: eventsream store changed"); - // firing before there are listeners this.sourceGraphChanged.emit(p); - if (firstPatch) { - firstPatch = false; - res(); - } + res(); }); + + // this is for old-style dels-then-adds patches + ev.addEventListener("patch", async (ev) => { + // this is reentrant- patches might be able to get applied out of order! + + const patchMsg: CombinedPatchEvent = JSON.parse((ev as any).data); + + const p0 = await this.makePatchFromParsed( + PatchDirection.DEL, + patchMsg.patch.deletes + ); + const p1 = await this.makePatchFromParsed( + PatchDirection.ADD, + patchMsg.patch.adds + ); + + this.applyPatch(p0); + this.applyPatch(p1); + + this.isCurrent = true; + }); + // here, add listeners for new-style add/del patches }); } private async reloadSimpleFile(resp: Response) { const bodyText = await resp.text(); + const p = new Patch(PatchDirection.ADD); const parser = new Parser({ format: "application/trig" }); await new Promise<void>((res, rej) => { parser.parse(bodyText, (error, quad, prefixes) => { @@ -76,16 +129,14 @@ return; } if (quad) { - this.store.addQuad(quad); // prob want to do this as one or a small number of patches + p.quads.push(quad); } else { res(); } }); }); + this.applyPatch(p); this.isCurrent = true; - // this may have to fire per addQuad call for correctness, or else we batch the adds where no readers can see them in advance. - console.log("patchlife0: simple file store changed"); - this.sourceGraphChanged.emit(new Patch()); } quadCount(): number {