Mercurial > code > home > repos > light9
diff web/RdfDbChannel.ts @ 2376:4556eebe5d73
topdir reorgs; let pdm have its src/ dir; separate vite area from light9/
author | drewp@bigasterisk.com |
---|---|
date | Sun, 12 May 2024 19:02:10 -0700 |
parents | light9/web/RdfDbChannel.ts@cdfd2901918a |
children | ac55319a2eac |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/web/RdfDbChannel.ts Sun May 12 19:02:10 2024 -0700 @@ -0,0 +1,160 @@ +import debug from "debug"; +import { SubEvent } from "sub-events"; +import { SyncgraphPatchMessage } from "./patch"; +const log = debug("rdfdbclient"); + +class ChannelPinger { + private timeoutId?: number; + private lastMs: number = 0; + constructor(private ws: WebSocket) { + this._pingLoop(); + } + lastPingMs(): number { + return this.lastMs; + } + pong() { + this.lastMs = Date.now() + this.lastMs; + } + _pingLoop() { + if (this.ws.readyState !== this.ws.OPEN) { + return; + } + this.ws.send("PING"); + this.lastMs = -Date.now(); + + if (this.timeoutId != null) { + clearTimeout(this.timeoutId); + } + this.timeoutId = (setTimeout(this._pingLoop.bind(this), 10000) as unknown) as number; + } +} + +export class RdfDbChannel { + // lower level reconnecting websocket -- knows about message types, but not what's inside a patch body + private ws?: WebSocket = undefined; + private pinger?: ChannelPinger; + private connectionId: string = "none"; // server's name for us + private reconnectTimer?: NodeJS.Timeout = undefined; + private messagesReceived = 0; // (non-ping messages) + private messagesSent = 0; + + newConnection: SubEvent<void> = new SubEvent(); + serverMessage: SubEvent<{ evType: string; body: SyncgraphPatchMessage }> = new SubEvent(); + statusDisplay: SubEvent<string> = new SubEvent(); + + constructor(public patchSenderUrl: string) { + this.openConnection(); + } + sendMessage(body: string): boolean { + // one try, best effort, true if we think it worked + if (!this.ws || this.ws.readyState !== this.ws.OPEN) { + return false; + } + log("send patch to server, " + body.length + " bytes"); + this.ws.send(body); + this.messagesSent++; + this.updateStatus(); + return true; + } + + disconnect(why:string) { + // will be followed by an autoconnect + log("disconnect requested:", why); + if (this.ws !== undefined) { + const closeHandler = this.ws.onclose?.bind(this.ws); + if (!closeHandler) { + throw new Error(); + } + closeHandler(new CloseEvent("forced")); + } + } + + private openConnection() { + const wsOrWss = window.location.protocol.replace("http", "ws"); + const fullUrl = wsOrWss + "//" + window.location.host + this.patchSenderUrl; + if (this.ws !== undefined) { + this.ws.close(); + } + this.ws = new WebSocket(fullUrl); + this.ws.onopen = this.onWsOpen.bind(this, this.ws); + this.ws.onerror = this.onWsError.bind(this); + this.ws.onclose = this.onWsClose.bind(this); + this.ws.onmessage = this.onWsMessage.bind(this); + } + + private onWsOpen(ws: WebSocket) { + log("new connection to", this.patchSenderUrl); + this.updateStatus(); + this.newConnection.emit(); + this.pinger = new ChannelPinger(ws); + } + + private onWsMessage(evt: { data: string }) { + const msg = evt.data; + if (msg === "PONG") { + this.onPong(); + return; + } + this.onJson(msg); + } + + private onPong() { + if (this.pinger) { + this.pinger.pong(); + this.updateStatus(); + } + } + + private onJson(msg: string) { + const input = JSON.parse(msg); + if (input.connectedAs) { + this.connectionId = input.connectedAs; + } else { + this.onPatch(input as SyncgraphPatchMessage); + } + } + + private onPatch(input: SyncgraphPatchMessage) { + log(`patch msg from server`); + this.serverMessage.emit({ evType: "patch", body: input }); + this.messagesReceived++; + this.updateStatus(); + } + + private onWsError(e: Event) { + log("ws error", e); + this.disconnect("ws error"); + this.updateStatus(); + } + + private onWsClose(ev: CloseEvent) { + log("ws close"); + this.updateStatus(); + if (this.reconnectTimer !== undefined) { + clearTimeout(this.reconnectTimer); + } + this.reconnectTimer = setTimeout(this.openConnection.bind(this), 1000); + } + + private updateStatus() { + const conn = (() => { + if (this.ws === undefined) { + return "no"; + } else { + switch (this.ws.readyState) { + case this.ws.CONNECTING: + return "connecting"; + case this.ws.OPEN: + return `open as ${this.connectionId}`; + case this.ws.CLOSING: + return "closing"; + case this.ws.CLOSED: + return "close"; + } + } + })(); + + const ping = this.pinger ? this.pinger.lastPingMs() : "..."; + this.statusDisplay.emit(`${conn}; ${this.messagesReceived} recv; ${this.messagesSent} sent; ping ${ping}ms`); + } +}