Mercurial > code > home > repos > streamed-graph
view src/SourceGraph.ts @ 139:cf642d395be4
new simpler Patch class; fancier 'hide' view config support
author | drewp@bigasterisk.com |
---|---|
date | Mon, 08 May 2023 13:05:20 -0700 |
parents | a8939c717015 |
children |
line wrap: on
line source
import { JsonLdParser } from "jsonld-streaming-parser"; import { Parser, Store } from "n3"; import { SubEvent } from "sub-events"; import { Patch, PatchDirection } from "./Patch"; type JsonLdData = Object; interface CombinedPatchBody { adds?: JsonLdData; deletes?: JsonLdData; } interface CombinedPatchEvent { patch: CombinedPatchBody; } // Possibly-changing graph for one source. Maintains its own dataset. // Read-only. // // Rename to RemoteGraph? RemoteStore? SyncedStore? PatchableGraph? export class SourceGraph { store: Store; // const; do not rebuild isCurrent: boolean = false; sourceGraphChanged: SubEvent<Patch> = new SubEvent(); constructor(public url: string /* immutable */) { this.store = new Store(); } dispose() {} // Call this first, after you've subscribed to `sourceGraphChanged`. This call may // synchronously fire change events. // async reloadRdf() { const resp = await fetch(this.url); this.clearStore(); const ctype = resp.headers.get("content-type"); if (ctype?.startsWith("text/event-stream")) { await this.reloadEventStream(); } else { await this.reloadSimpleFile(resp); } } private clearStore() { this.store.removeMatches(null, null, null, null); } async makePatchFromParsed( dir: PatchDirection, jsonLdObj: Object | undefined ): Promise<Patch> { const p = new Patch(dir); if (jsonLdObj === undefined) { return p; } const parser = new JsonLdParser(); parser.write(JSON.stringify(jsonLdObj)); parser.end(); await p.streamImport(parser); return p; } private applyPatch(p: Patch) { if (p.isEmpty()){ return; } p.applyToStore(this.store); if (this.sourceGraphChanged.getStat().unnamed == 0) { console.warn("no listeners to this sourceGraphChanged event"); } this.sourceGraphChanged.emit(p); } private async reloadEventStream(): Promise<void> { return new Promise((res, rej) => { const ev = new EventSource(this.url); // old-style fullGraph. I now think it would be better to send plain // adds and for this SourceGraph to emptyGraph when there's a new connection. ev.addEventListener("fullGraph", async (ev) => { this.clearStore(); const p = new Patch(PatchDirection.ADD); const parser = new JsonLdParser(); parser.write(ev.data); parser.end(); await p.streamImport(parser); p.applyToStore(this.store); this.isCurrent = true; this.sourceGraphChanged.emit(p); res(); }); // this is for old-style dels-then-adds patches ev.addEventListener("patch", async (ev) => { // this is reentrant- patches might be able to get applied out of order! const patchMsg: CombinedPatchEvent = JSON.parse((ev as any).data); const p0 = await this.makePatchFromParsed( PatchDirection.DEL, patchMsg.patch.deletes ); const p1 = await this.makePatchFromParsed( PatchDirection.ADD, patchMsg.patch.adds ); this.applyPatch(p0); this.applyPatch(p1); this.isCurrent = true; }); // here, add listeners for new-style add/del patches }); } private async reloadSimpleFile(resp: Response) { const bodyText = await resp.text(); const p = new Patch(PatchDirection.ADD); const parser = new Parser({ format: "application/trig" }); await new Promise<void>((res, rej) => { parser.parse(bodyText, (error, quad, prefixes) => { if (error) { console.log("parse ~ error:", error); rej(error); return; } if (quad) { p.quads.push(quad); } else { res(); } }); }); this.applyPatch(p); this.isCurrent = true; } quadCount(): number { return this.store.countQuads(null, null, null, null); } }