diff --git a/light9/web/RdfDbChannel.ts b/light9/web/RdfDbChannel.ts new file mode 100644 --- /dev/null +++ b/light9/web/RdfDbChannel.ts @@ -0,0 +1,158 @@ +import debug from "debug"; +import { SubEvent } from "sub-events"; +import { SyncgraphPatchMessage } from "./patch"; +const log = debug("rdfdbclient"); + +class ChannelPinger { + private timeoutId?: number; + private lastMs: number = 0; + constructor(private ws: WebSocket) { + this._pingLoop(); + } + lastPingMs(): number { + return this.lastMs; + } + pong() { + this.lastMs = Date.now() + this.lastMs; + } + _pingLoop() { + if (this.ws.readyState !== this.ws.OPEN) { + return; + } + this.ws.send("PING"); + this.lastMs = -Date.now(); + + if (this.timeoutId != null) { + clearTimeout(this.timeoutId); + } + this.timeoutId = (setTimeout(this._pingLoop.bind(this), 10000) as unknown) as number; + } +} + +export class RdfDbChannel { + // lower level reconnecting websocket -- knows about message types, but not what's inside a patch body + private ws?: WebSocket = undefined; + private pinger?: ChannelPinger; + private connectionId: string = "none"; // server's name for us + private reconnectTimer?: NodeJS.Timeout = undefined; + private messagesReceived = 0; // (non-ping messages) + private messagesSent = 0; + + newConnection: SubEvent = new SubEvent(); + serverMessage: SubEvent<{ evType: string; body: SyncgraphPatchMessage }> = new SubEvent(); + statusDisplay: SubEvent = new SubEvent(); + + constructor(public patchSenderUrl: string) { + this.openConnection(); + } + sendMessage(body: string): boolean { + // one try, best effort, true if we think it worked + if (!this.ws || this.ws.readyState !== this.ws.OPEN) { + return false; + } + log("send patch to server, " + body.length + " bytes"); + this.ws.send(body); + return true; + } + + disconnect() { + // will be followed by an autoconnect + log("disconnect requested"); + if (this.ws !== undefined) { + const closeHandler = this.ws.onclose?.bind(this.ws); + if (!closeHandler) { + throw new Error(); + } + closeHandler(new CloseEvent("forced")); + } + } + + private openConnection() { + const wsOrWss = window.location.protocol.replace("http", "ws"); + const fullUrl = wsOrWss + "//" + window.location.host + this.patchSenderUrl; + if (this.ws !== undefined) { + this.ws.close(); + } + this.ws = new WebSocket(fullUrl); + this.ws.onopen = this.onWsOpen.bind(this, this.ws); + this.ws.onerror = this.onWsError.bind(this); + this.ws.onclose = this.onWsClose.bind(this); + this.ws.onmessage = this.onWsMessage.bind(this); + } + + private onWsOpen(ws: WebSocket) { + log("new connection to", this.patchSenderUrl); + this.updateStatus(); + this.newConnection.emit(); + this.pinger = new ChannelPinger(ws); + } + + private onWsMessage(evt: { data: string }) { + const msg = evt.data; + if (msg === "PONG") { + this.onPong(); + return; + } + this.onJson(msg); + } + + private onPong() { + if (this.pinger) { + this.pinger.pong(); + this.updateStatus(); + } + } + + private onJson(msg: string) { + const input = JSON.parse(msg); + if (input.connectedAs) { + this.connectionId = input.connectedAs; + } else { + this.onPatch(input as SyncgraphPatchMessage); + } + } + + private onPatch(input: SyncgraphPatchMessage) { + log("patch from server [0]"); + this.serverMessage.emit({ evType: "patch", body: input }); + this.messagesReceived++; + this.updateStatus(); + } + + private onWsError(e: Event) { + log("ws error", e); + this.disconnect(); + this.updateStatus(); + } + + private onWsClose(ev: CloseEvent) { + log("ws close"); + this.updateStatus(); + if (this.reconnectTimer !== undefined) { + clearTimeout(this.reconnectTimer); + } + this.reconnectTimer = setTimeout(this.openConnection.bind(this), 1000); + } + + private updateStatus() { + const conn = (() => { + if (this.ws === undefined) { + return "no"; + } else { + switch (this.ws.readyState) { + case this.ws.CONNECTING: + return "connecting"; + case this.ws.OPEN: + return `open as ${this.connectionId}`; + case this.ws.CLOSING: + return "closing"; + case this.ws.CLOSED: + return "close"; + } + } + })(); + + const ping = this.pinger ? this.pinger.lastPingMs() : "..."; + this.statusDisplay.emit(`${conn}; ${this.messagesReceived} recv; ${this.messagesSent} sent; ping ${ping}ms`); + } +} diff --git a/light9/web/rdfdbclient.ts b/light9/web/rdfdbclient.ts --- a/light9/web/rdfdbclient.ts +++ b/light9/web/rdfdbclient.ts @@ -1,162 +1,7 @@ import debug from "debug"; -import { SubEvent } from "sub-events"; -import { parseJsonPatch, Patch, patchSizeSummary, SyncgraphPatchMessage, toJsonPatch } from "./patch"; -const log = debug("rdfdbclient"); - -class ChannelPinger { - private timeoutId?: number; - private lastMs: number = 0; - constructor(private ws: WebSocket) { - this._pingLoop(); - } - lastPingMs(): number { - return this.lastMs; - } - pong() { - this.lastMs = Date.now() + this.lastMs; - } - _pingLoop() { - if (this.ws.readyState !== this.ws.OPEN) { - return; - } - this.ws.send("PING"); - this.lastMs = -Date.now(); - - if (this.timeoutId != null) { - clearTimeout(this.timeoutId); - } - this.timeoutId = (setTimeout(this._pingLoop.bind(this), 10000) as unknown) as number; - } -} - -class RdfDbChannel { - // lower level reconnecting websocket -- knows about message types, but not what's inside a patch body - - private ws?: WebSocket = undefined; - private pinger?: ChannelPinger; - private connectionId: string = "none"; // server's name for us - private reconnectTimer?: NodeJS.Timeout = undefined; - private messagesReceived = 0; // (non-ping messages) - private messagesSent = 0; - - newConnection: SubEvent = new SubEvent(); - serverMessage: SubEvent<{ evType: string; body: SyncgraphPatchMessage }> = new SubEvent(); - statusDisplay: SubEvent = new SubEvent(); - - constructor(public patchSenderUrl: string) { - this.openConnection(); - } - sendMessage(body: string): boolean { - // one try, best effort, true if we think it worked - if (!this.ws || this.ws.readyState !== this.ws.OPEN) { - return false; - } - log("send patch to server, " + body.length + " bytes"); - this.ws.send(body); - return true; - } - - disconnect() { - // will be followed by an autoconnect - log("disconnect requested"); - if (this.ws !== undefined) { - const closeHandler = this.ws.onclose?.bind(this.ws); - if (!closeHandler) { - throw new Error(); - } - closeHandler(new CloseEvent("forced")); - } - } - - private openConnection() { - const wsOrWss = window.location.protocol.replace("http", "ws"); - const fullUrl = wsOrWss + "//" + window.location.host + this.patchSenderUrl; - if (this.ws !== undefined) { - this.ws.close(); - } - this.ws = new WebSocket(fullUrl); - this.ws.onopen = this.onWsOpen.bind(this, this.ws); - this.ws.onerror = this.onWsError.bind(this); - this.ws.onclose = this.onWsClose.bind(this); - this.ws.onmessage = this.onWsMessage.bind(this); - } - - private onWsOpen(ws: WebSocket) { - log("new connection to", this.patchSenderUrl); - this.updateStatus(); - this.newConnection.emit(); - this.pinger = new ChannelPinger(ws); - } - - private onWsMessage(evt: { data: string }) { - const msg = evt.data; - if (msg === "PONG") { - this.onPong(); - return; - } - this.onJson(msg); - } - - private onPong() { - if (this.pinger) { - this.pinger.pong(); - this.updateStatus(); - } - } - - private onJson(msg: string) { - const input = JSON.parse(msg); - if (input.connectedAs) { - this.connectionId = input.connectedAs; - } else { - this.onPatch(input as SyncgraphPatchMessage); - } - } - - private onPatch(input: SyncgraphPatchMessage) { - log("patch from server [0]"); - this.serverMessage.emit({ evType: "patch", body: input }); - this.messagesReceived++; - this.updateStatus(); - } - - private onWsError(e: Event) { - log("ws error", e); - this.disconnect(); - this.updateStatus(); - } - - private onWsClose(ev: CloseEvent) { - log("ws close"); - this.updateStatus(); - if (this.reconnectTimer !== undefined) { - clearTimeout(this.reconnectTimer); - } - this.reconnectTimer = setTimeout(this.openConnection.bind(this), 1000); - } - - private updateStatus() { - const conn = (() => { - if (this.ws === undefined) { - return "no"; - } else { - switch (this.ws.readyState) { - case this.ws.CONNECTING: - return "connecting"; - case this.ws.OPEN: - return `open as ${this.connectionId}`; - case this.ws.CLOSING: - return "closing"; - case this.ws.CLOSED: - return "close"; - } - } - })(); - - const ping = this.pinger ? this.pinger.lastPingMs() : "..."; - this.statusDisplay.emit(`${conn}; ${this.messagesReceived} recv; ${this.messagesSent} sent; ping ${ping}ms`); - } -} +import { parseJsonPatch, Patch, patchSizeSummary, toJsonPatch } from "./patch"; +import { RdfDbChannel } from "./RdfDbChannel"; +export const log = debug("rdfdbclient"); export class RdfDbClient { private channel: RdfDbChannel;