# HG changeset patch # User drewp@bigasterisk.com # Date 2023-05-28 06:05:04 # Node ID 7c94348d61276cb1f7cf65ebda6e54eea3349d71 # Parent b0be0315426dfc87e26a89ceee802c54850b2b8a big refactor of RdfDbClient, separating the websocket layer diff --git a/light9/web/rdfdbclient.ts b/light9/web/rdfdbclient.ts --- a/light9/web/rdfdbclient.ts +++ b/light9/web/rdfdbclient.ts @@ -1,95 +1,64 @@ import debug from "debug"; -import * as async from "async"; -import { parseJsonPatch, Patch, patchSizeSummary, toJsonPatch } from "./patch"; +import { SubEvent } from "sub-events"; +import { parseJsonPatch, Patch, patchSizeSummary, SyncgraphPatchMessage, toJsonPatch } from "./patch"; const log = debug("rdfdbclient"); -export class RdfDbClient { - _patchesToSend: Patch[]; - _lastPingMs: number; - _patchesReceived: number; - _patchesSent: number; - _connectionId: string; - _reconnectionTimeout?: number; - ws?: WebSocket; - _pingLoopTimeout?: number; - // Send and receive patches from rdfdb. Primarily used in SyncedGraph. - // - // What this should do, and does not yet, is keep the graph - // 'coasting' over a reconnect, applying only the diffs from the old - // contents to the new ones once they're in. Then, remove all the - // clearGraph stuff in graph.coffee that doesn't even work right. - // - constructor( - public patchSenderUrl: string, - private clearGraphOnNewConnection: () => void, - private applyPatch: (p: Patch) => void, - private setStatus: (status: string) => void - ) { - this._patchesToSend = []; - this._lastPingMs = -1; - this._patchesReceived = 0; - this._patchesSent = 0; - this._connectionId = "??"; - this.ws = undefined; +class ChannelPinger { + private timeoutId?: number; + private lastMs: number = 0; + constructor(private ws: WebSocket) { + this._pingLoop(); + } + lastPingMs(): number { + return this.lastMs; + } + pong() { + this.lastMs = Date.now() + this.lastMs; + } + _pingLoop() { + if (this.ws.readyState !== this.ws.OPEN) { + return; + } + this.ws.send("PING"); + this.lastMs = -Date.now(); + + if (this.timeoutId != null) { + clearTimeout(this.timeoutId); + } + this.timeoutId = (setTimeout(this._pingLoop.bind(this), 10000) as unknown) as number; + } +} - this._newConnection(); +class RdfDbChannel { + // lower level reconnecting websocket -- knows about message types, but not what's inside a patch body + + private ws?: WebSocket = undefined; + private pinger?: ChannelPinger; + private connectionId: string = "none"; // server's name for us + private reconnectTimer?: NodeJS.Timeout = undefined; + private messagesReceived = 0; // (non-ping messages) + private messagesSent = 0; + + newConnection: SubEvent = new SubEvent(); + serverMessage: SubEvent<{ evType: string; body: SyncgraphPatchMessage }> = new SubEvent(); + statusDisplay: SubEvent = new SubEvent(); + + constructor(public patchSenderUrl: string) { + this.openConnection(); + } + sendMessage(body: string): boolean { + // one try, best effort, true if we think it worked + if (!this.ws || this.ws.readyState !== this.ws.OPEN) { + return false; + } + log("send patch to server, " + body.length + " bytes"); + this.ws.send(body); + return true; } - _updateStatus() { - const conn = (() => { - if (this.ws === undefined) { - return "no"; - } else { - switch (this.ws.readyState) { - case this.ws.CONNECTING: - return "connecting"; - case this.ws.OPEN: - return `open as ${this._connectionId}`; - case this.ws.CLOSING: - return "closing"; - case this.ws.CLOSED: - return "close"; - } - } - })(); - - const ping = this._lastPingMs > 0 ? this._lastPingMs : "..."; - return this.setStatus(`${conn}; \ -${this._patchesReceived} recv; \ -${this._patchesSent} sent; \ -${this._patchesToSend.length} pending; \ -${ping}ms`); - } - - sendPatch(patch: Patch) { - log("queue patch to server ", patchSizeSummary(patch)); - this._patchesToSend.push(patch); - this._updateStatus(); - this._continueSending(); - } - - _newConnection() { - const wsOrWss = window.location.protocol.replace("http", "ws"); - const fullUrl = wsOrWss + "//" + window.location.host + this.patchSenderUrl; - if (this.ws !== undefined) { - this.ws.close(); - } - this.ws = new WebSocket(fullUrl); - this.ws.onopen = this.onWsOpen.bind(this); - this.ws.onerror = this.onWsError.bind(this); - this.ws.onclose = this.onWsClose.bind(this); - this.ws.onmessage = this._onMessage.bind(this); - } - - private onWsOpen() { - log("new connection to", this.patchSenderUrl); - this._updateStatus(); - this.clearGraphOnNewConnection(); - return this._pingLoop(); - } - - private onWsError(e: Event) { - log("ws error", e); + disconnect() { + // will be followed by an autoconnect + log("disconnect requested"); if (this.ws !== undefined) { const closeHandler = this.ws.onclose?.bind(this.ws); if (!closeHandler) { @@ -99,71 +68,150 @@ export class RdfDbClient { } } - private onWsClose(ev: CloseEvent) { - log("ws close"); - this._updateStatus(); - if (this._reconnectionTimeout !== undefined) { - clearTimeout(this._reconnectionTimeout); + private openConnection() { + const wsOrWss = window.location.protocol.replace("http", "ws"); + const fullUrl = wsOrWss + "//" + window.location.host + this.patchSenderUrl; + if (this.ws !== undefined) { + this.ws.close(); } - this._reconnectionTimeout = (setTimeout(this._newConnection.bind(this), 1000) as unknown) as number; + this.ws = new WebSocket(fullUrl); + this.ws.onopen = this.onWsOpen.bind(this, this.ws); + this.ws.onerror = this.onWsError.bind(this); + this.ws.onclose = this.onWsClose.bind(this); + this.ws.onmessage = this.onWsMessage.bind(this); + } + + private onWsOpen(ws: WebSocket) { + log("new connection to", this.patchSenderUrl); + this.updateStatus(); + this.newConnection.emit(); + this.pinger = new ChannelPinger(ws); } - _pingLoop() { - if (this.ws && this.ws.readyState === this.ws.OPEN) { - this.ws.send("PING"); - this._lastPingMs = -Date.now(); + private onWsMessage(evt: { data: string }) { + const msg = evt.data; + if (msg === "PONG") { + this.onPong(); + return; + } + this.onJson(msg); + } - if (this._pingLoopTimeout != null) { - clearTimeout(this._pingLoopTimeout); - } - this._pingLoopTimeout = (setTimeout(this._pingLoop.bind(this), 10000) as unknown) as number; + private onPong() { + if (this.pinger) { + this.pinger.pong(); + this.updateStatus(); + } + } + + private onJson(msg: string) { + const input = JSON.parse(msg); + if (input.connectedAs) { + this.connectionId = input.connectedAs; + } else { + this.onPatch(input as SyncgraphPatchMessage); } } - _onMessage(evt: { data: string }) { - const msg = evt.data; - if (msg === "PONG") { - this._lastPingMs = Date.now() + this._lastPingMs; - this._updateStatus(); - return; - } + private onPatch(input: SyncgraphPatchMessage) { + log("patch from server [0]"); + this.serverMessage.emit({ evType: "patch", body: input }); + this.messagesReceived++; + this.updateStatus(); + } - const input = JSON.parse(msg); - if (input.connectedAs) { - this._connectionId = input.connectedAs; - } else { - log("patch from server [0]") - parseJsonPatch(input, this.applyPatch.bind(this)); - this._patchesReceived++; + private onWsError(e: Event) { + log("ws error", e); + this.disconnect(); + this.updateStatus(); + } + + private onWsClose(ev: CloseEvent) { + log("ws close"); + this.updateStatus(); + if (this.reconnectTimer !== undefined) { + clearTimeout(this.reconnectTimer); } - return this._updateStatus(); + this.reconnectTimer = setTimeout(this.openConnection.bind(this), 1000); } - _continueSending() { - if (this.ws && this.ws.readyState !== this.ws.OPEN) { - setTimeout(this._continueSending.bind(this), 500); - return; - } + private updateStatus() { + const conn = (() => { + if (this.ws === undefined) { + return "no"; + } else { + switch (this.ws.readyState) { + case this.ws.CONNECTING: + return "connecting"; + case this.ws.OPEN: + return `open as ${this.connectionId}`; + case this.ws.CLOSING: + return "closing"; + case this.ws.CLOSED: + return "close"; + } + } + })(); + + const ping = this.pinger ? this.pinger.lastPingMs() : "..."; + this.statusDisplay.emit(`${conn}; ${this.messagesReceived} recv; ${this.messagesSent} sent; ping ${ping}ms`); + } +} +export class RdfDbClient { + private channel: RdfDbChannel; + _patchesToSend: Patch[]; + // Send and receive patches from rdfdb. Primarily used in SyncedGraph. + // + // What this should do, and does not yet, is keep the graph + // 'coasting' over a reconnect, applying only the diffs from the old + // contents to the new ones once they're in. Then, remove all the + // clearGraph stuff in graph.coffee that doesn't even work right. + // + constructor( + patchSenderUrl: string, + private clearGraphOnNewConnection: () => void, + private applyPatch: (p: Patch) => void, + setStatus: (status: string) => void + ) { + this._patchesToSend = []; + this.channel = new RdfDbChannel(patchSenderUrl); + this.channel.statusDisplay.subscribe((st: string) => { + setStatus(st + `; ${this._patchesToSend.length} pending `); + }); + this.channel.newConnection.subscribe(() => { + this.clearGraphOnNewConnection(); + }); + this.channel.serverMessage.subscribe((m) => { + parseJsonPatch(m.body, this.applyPatch.bind(this)); + }); + } + + sendPatch(patch: Patch) { + log("queue patch to server ", patchSizeSummary(patch)); + this._patchesToSend.push(patch); + this._continueSending(); + } + + disconnect() { + this.channel.disconnect(); + } + + async _continueSending() { // we could call this less often and coalesce patches together to optimize // the dragging cases. + while (this._patchesToSend.length) { + const patch = this._patchesToSend.splice(0, 1)[0]; + const json = await new Promise((res, rej) => { + toJsonPatch(patch, res); + }); + const ret = this.channel.sendMessage(json); + if (!ret) { + setTimeout(this._continueSending.bind(this), 500); - const sendOne = (patch: any, cb: (arg0: any) => any) => { - return toJsonPatch(patch, (json: string) => { - log("send patch to server, " + json.length + " bytes"); - if (!this.ws) { - throw new Error("can't send"); - } - this.ws.send(json); - this._patchesSent++; - this._updateStatus(); - return cb(null); - }); - }; - - return async.eachSeries(this._patchesToSend, sendOne, () => { - this._patchesToSend = []; - return this._updateStatus(); - }); + // this.disconnect() + return; + } + } } }