Mercurial > code > home > repos > streamed-graph
diff src/layout/streamed_graph_client.ts @ 106:2468f2227d22
make src/layout/ and src/render/ separation
author | drewp@bigasterisk.com |
---|---|
date | Sun, 13 Mar 2022 22:00:30 -0700 |
parents | src/streamed_graph_client.ts@910e2037d72d |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/layout/streamed_graph_client.ts Sun Mar 13 22:00:30 2022 -0700 @@ -0,0 +1,142 @@ +import { eachJsonLdQuad } from "./json_ld_quads"; +import { Store } from "n3"; + +export class StreamedGraphClient { + // holds a n3 Store, which is synced to a server-side + // store that sends patches over SSE + + onStatus: (msg: string) => void = function (m) {}; + onGraphChanged: () => void = function () {}; + store: Store; + _deletedCount: number = 0; + events!: EventSource; + constructor( + eventsUrl: string, + onGraphChanged: () => void, + onStatus: (status: string) => void, + prefixes: Array<Record<string, string>>, + staticGraphUrls: Array<string> + ) { + console.log("new StreamedGraph", eventsUrl); + this.onStatus = onStatus; + this.onGraphChanged = onGraphChanged; + this.onStatus("startup..."); + + this.store = new Store(); + + // Object.keys(prefixes).forEach((prefix) => { + // this.store.setPrefix(prefix, prefixes[prefix]); + // }); + + this.connect(eventsUrl); + this.reconnectOnWake(); + + // staticGraphUrls.forEach((url) => { + // fetch(url).then((response) => response.text()) + // .then((body) => { + // // parse with n3, add to output + // }); + // }); + } + + _vacuum() { + // workaround for the growing _ids map + this.store = new Store(this.store.getQuads(null, null, null, null)); + } + + reconnectOnWake() { + // it's not this, which fires on every mouse-in on a browser window, and doesn't seem to work for screen-turned-back-on + //window.addEventListener('focus', function() { this.connect(eventsUrl); }.bind(this)); + } + + connect(eventsUrl: string) { + // need to exit here if this obj has been replaced + + this.onStatus("start connect..."); + this.close(); + if (this.events && this.events.readyState != EventSource.CLOSED) { + this.onStatus("zombie"); + throw new Error("zombie eventsource"); + } + + this.events = new EventSource(eventsUrl, { withCredentials: true }); + + this.events.addEventListener("error", (ev) => { + // todo: this is piling up tons of retries and eventually multiple connections + this.testEventUrl(eventsUrl); + this.onStatus("connection lost- retrying"); + setTimeout(() => { + requestAnimationFrame(() => { + this.connect(eventsUrl); + }); + }, 3000); + }); + + this.events.addEventListener("fullGraph", async (ev) => { + this.onStatus("sync- full graph update"); + await this.replaceFullGraph((ev as MessageEvent).data); + this.onStatus(`synced ${this.store.size}`); + this.onGraphChanged(); + }); + + this.events.addEventListener("patch", async (ev) => { + this.onStatus("sync- updating"); + await this.patchGraph((ev as MessageEvent).data); + window.setTimeout(() => { + this.onStatus(`synced ${this.store.size}`); + }, 60); + this.onGraphChanged(); + }); + this.onStatus("connecting..."); + } + + // these need some locks + async replaceFullGraph(jsonLdText: string) { + this.store = new Store(); + await eachJsonLdQuad( + JSON.parse(jsonLdText), + this.store.addQuad.bind(this.store) + ); + } + + async patchGraph(patchJson: string) { + var patch = JSON.parse(patchJson).patch; + + await eachJsonLdQuad(patch.deletes, (quad) => { + this.store.removeQuad(quad); + this._deletedCount++; + }); + await eachJsonLdQuad(patch.adds, this.store.addQuad.bind(this.store)); + + if (this._deletedCount > 100) { + this._vacuum(); + this._deletedCount = 0; + } + } + + close() { + if (this.events) { + this.events.close(); + } + } + + async testEventUrl(eventsUrl: string): Promise<void> { + return new Promise<void>((resolve, reject) => { + this.onStatus("testing connection"); + fetch(eventsUrl, { + method: "HEAD", + credentials: "include", + }) + .then((value) => { + if (value.status == 403) { + reject(); + return; + } + resolve(); + }) + .catch((err) => { + reject(); + }); + }); + } +}