import debug from "debug"; import { SubEvent } from "sub-events"; import { SyncgraphPatchMessage } from "./patch"; const log = debug("rdfdbclient"); class ChannelPinger { private timeoutId?: number; private lastMs: number = 0; constructor(private ws: WebSocket) { this._pingLoop(); } lastPingMs(): number { return this.lastMs; } pong() { this.lastMs = Date.now() + this.lastMs; } _pingLoop() { if (this.ws.readyState !== this.ws.OPEN) { return; } this.ws.send("PING"); this.lastMs = -Date.now(); if (this.timeoutId != null) { clearTimeout(this.timeoutId); } this.timeoutId = (setTimeout(this._pingLoop.bind(this), 10000) as unknown) as number; } } export class RdfDbChannel { // lower level reconnecting websocket -- knows about message types, but not what's inside a patch body private ws?: WebSocket = undefined; private pinger?: ChannelPinger; private connectionId: string = "none"; // server's name for us private reconnectTimer?: NodeJS.Timeout = undefined; private messagesReceived = 0; // (non-ping messages) private messagesSent = 0; newConnection: SubEvent = new SubEvent(); serverMessage: SubEvent<{ evType: string; body: SyncgraphPatchMessage }> = new SubEvent(); statusDisplay: SubEvent = new SubEvent(); constructor(public patchSenderUrl: string) { this.openConnection(); } sendMessage(body: string): boolean { // one try, best effort, true if we think it worked if (!this.ws || this.ws.readyState !== this.ws.OPEN) { return false; } log("send patch to server, " + body.length + " bytes"); this.ws.send(body); this.messagesSent++; this.updateStatus(); return true; } disconnect(why:string) { // will be followed by an autoconnect log("disconnect requested:", why); if (this.ws !== undefined) { const closeHandler = this.ws.onclose?.bind(this.ws); if (!closeHandler) { throw new Error(); } closeHandler(new CloseEvent("forced")); } } private openConnection() { const wsOrWss = window.location.protocol.replace("http", "ws"); const fullUrl = wsOrWss + "//" + window.location.host + this.patchSenderUrl; if (this.ws !== undefined) { this.ws.close(); } this.ws = new WebSocket(fullUrl); this.ws.onopen = this.onWsOpen.bind(this, this.ws); this.ws.onerror = this.onWsError.bind(this); this.ws.onclose = this.onWsClose.bind(this); this.ws.onmessage = this.onWsMessage.bind(this); } private onWsOpen(ws: WebSocket) { log("new connection to", this.patchSenderUrl); this.updateStatus(); this.newConnection.emit(); this.pinger = new ChannelPinger(ws); } private onWsMessage(evt: { data: string }) { const msg = evt.data; if (msg === "PONG") { this.onPong(); return; } this.onJson(msg); } private onPong() { if (this.pinger) { this.pinger.pong(); this.updateStatus(); } } private onJson(msg: string) { const input = JSON.parse(msg); if (input.connectedAs) { this.connectionId = input.connectedAs; } else { this.onPatch(input as SyncgraphPatchMessage); } } private onPatch(input: SyncgraphPatchMessage) { log(`patch msg from server`); this.serverMessage.emit({ evType: "patch", body: input }); this.messagesReceived++; this.updateStatus(); } private onWsError(e: Event) { log("ws error", e); this.disconnect("ws error"); this.updateStatus(); } private onWsClose(ev: CloseEvent) { log("ws close"); this.updateStatus(); if (this.reconnectTimer !== undefined) { clearTimeout(this.reconnectTimer); } this.reconnectTimer = setTimeout(this.openConnection.bind(this), 1000); } private updateStatus() { const conn = (() => { if (this.ws === undefined) { return "no"; } else { switch (this.ws.readyState) { case this.ws.CONNECTING: return "connecting"; case this.ws.OPEN: return `open as ${this.connectionId}`; case this.ws.CLOSING: return "closing"; case this.ws.CLOSED: return "close"; } } })(); const ping = this.pinger ? this.pinger.lastPingMs() : "..."; this.statusDisplay.emit(`${conn}; ${this.messagesReceived} recv; ${this.messagesSent} sent; ping ${ping}ms`); } }