Mercurial > code > home > repos > streamed-graph
view src/SourceGraph.ts @ 128:5a1a79f54779
big rewrite
author | drewp@bigasterisk.com |
---|---|
date | Fri, 05 May 2023 21:26:36 -0700 |
parents | |
children | 73a70d00fb74 |
line wrap: on
line source
import { JsonLdParser } from "jsonld-streaming-parser"; import { Parser, Store } from "n3"; import { SubEvent } from "sub-events"; import { Patch } from "./Patch"; // Possibly-changing graph for one source. Maintains its own dataset. // Read-only. // // Rename to RemoteGraph? RemoteStore? SyncedStore? PatchableGraph? export class SourceGraph { store: Store; // const; do not rebuild isCurrent: boolean = false; sourceGraphChanged: SubEvent<Patch> = new SubEvent(); constructor(public url: string /* immutable */) { this.store = new Store(); } dispose() {} // Call this first, after you've subscribed to `sourceGraphChanged`. This call may // synchronously fire change events. // async reloadRdf() { const resp = await fetch(this.url); const ctype = resp.headers.get("content-type"); if (ctype?.startsWith("text/event-stream")) { await this.reloadEventStream(); } else { await this.reloadSimpleFile(resp); } } private async reloadEventStream(): Promise<void> { return new Promise((res, rej) => { // todo deal with reconnects const ev = new EventSource(this.url); let firstPatch = true; // clear store here? // maybe the event messages should be 'add' and 'del', // for less parsing and clearer order of ops. ev.addEventListener("patch", async (ev) => { const patchMsg = JSON.parse(ev.data); const p = new Patch(); const parser = new JsonLdParser(); parser.write(JSON.stringify(patchMsg.patch.adds)); parser.end(); await p.streamImport(parser); this.isCurrent = true; p.applyToStore(this.store); console.log("patchlife0: eventsream store changed"); this.sourceGraphChanged.emit(p); if (firstPatch) { firstPatch = false; res(); } }); }); } private async reloadSimpleFile(resp: Response) { const bodyText = await resp.text(); const parser = new Parser({ format: "application/trig" }); await new Promise<void>((res, rej) => { parser.parse(bodyText, (error, quad, prefixes) => { if (error) { console.log("parse ~ error:", error); rej(error); return; } if (quad) { this.store.addQuad(quad); // prob want to do this as one or a small number of patches } else { res(); } }); }); this.isCurrent = true; // this may have to fire per addQuad call for correctness, or else we batch the adds where no readers can see them in advance. console.log("patchlife0: simple file store changed"); this.sourceGraphChanged.emit(new Patch()); } quadCount(): number { return this.store.countQuads(null, null, null, null); } }