Mercurial > code > home > repos > streamed-graph
diff src/SourceGraph.ts @ 128:5a1a79f54779
big rewrite
author | drewp@bigasterisk.com |
---|---|
date | Fri, 05 May 2023 21:26:36 -0700 |
parents | |
children | 73a70d00fb74 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/SourceGraph.ts Fri May 05 21:26:36 2023 -0700 @@ -0,0 +1,91 @@ +import { JsonLdParser } from "jsonld-streaming-parser"; +import { Parser, Store } from "n3"; +import { SubEvent } from "sub-events"; +import { Patch } from "./Patch"; + +// Possibly-changing graph for one source. Maintains its own dataset. +// Read-only. +// +// Rename to RemoteGraph? RemoteStore? SyncedStore? PatchableGraph? +export class SourceGraph { + store: Store; // const; do not rebuild + isCurrent: boolean = false; + sourceGraphChanged: SubEvent<Patch> = new SubEvent(); + constructor(public url: string /* immutable */) { + this.store = new Store(); + } + + dispose() {} + + // Call this first, after you've subscribed to `sourceGraphChanged`. This call may + // synchronously fire change events. + // + async reloadRdf() { + const resp = await fetch(this.url); + const ctype = resp.headers.get("content-type"); + if (ctype?.startsWith("text/event-stream")) { + await this.reloadEventStream(); + } else { + await this.reloadSimpleFile(resp); + } + } + + private async reloadEventStream(): Promise<void> { + return new Promise((res, rej) => { + // todo deal with reconnects + const ev = new EventSource(this.url); + let firstPatch = true; + // clear store here? + + // maybe the event messages should be 'add' and 'del', + // for less parsing and clearer order of ops. + ev.addEventListener("patch", async (ev) => { + const patchMsg = JSON.parse(ev.data); + + const p = new Patch(); + + const parser = new JsonLdParser(); + parser.write(JSON.stringify(patchMsg.patch.adds)); + parser.end(); + + await p.streamImport(parser); + this.isCurrent = true; + + p.applyToStore(this.store); + console.log("patchlife0: eventsream store changed"); + this.sourceGraphChanged.emit(p); + if (firstPatch) { + firstPatch = false; + res(); + } + }); + }); + } + + private async reloadSimpleFile(resp: Response) { + const bodyText = await resp.text(); + const parser = new Parser({ format: "application/trig" }); + await new Promise<void>((res, rej) => { + parser.parse(bodyText, (error, quad, prefixes) => { + if (error) { + console.log("parse ~ error:", error); + rej(error); + return; + } + if (quad) { + this.store.addQuad(quad); // prob want to do this as one or a small number of patches + } else { + res(); + } + }); + }); + this.isCurrent = true; + // this may have to fire per addQuad call for correctness, or else we batch the adds where no readers can see them in advance. + console.log("patchlife0: simple file store changed"); + this.sourceGraphChanged.emit(new Patch()); + } + + quadCount(): number { + return this.store.countQuads(null, null, null, null); + } +}