diff web/RdfDbChannel.ts @ 2376:4556eebe5d73

topdir reorgs; let pdm have its src/ dir; separate vite area from light9/
author drewp@bigasterisk.com
date Sun, 12 May 2024 19:02:10 -0700
parents light9/web/RdfDbChannel.ts@cdfd2901918a
children ac55319a2eac
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/web/RdfDbChannel.ts	Sun May 12 19:02:10 2024 -0700
@@ -0,0 +1,160 @@
+import debug from "debug";
+import { SubEvent } from "sub-events";
+import { SyncgraphPatchMessage } from "./patch";
+const log = debug("rdfdbclient");
+
+class ChannelPinger {
+  private timeoutId?: number;
+  private lastMs: number = 0;
+  constructor(private ws: WebSocket) {
+    this._pingLoop();
+  }
+  lastPingMs(): number {
+    return this.lastMs;
+  }
+  pong() {
+    this.lastMs = Date.now() + this.lastMs;
+  }
+  _pingLoop() {
+    if (this.ws.readyState !== this.ws.OPEN) {
+      return;
+    }
+    this.ws.send("PING");
+    this.lastMs = -Date.now();
+
+    if (this.timeoutId != null) {
+      clearTimeout(this.timeoutId);
+    }
+    this.timeoutId = (setTimeout(this._pingLoop.bind(this), 10000) as unknown) as number;
+  }
+}
+
+export class RdfDbChannel {
+  // lower level reconnecting websocket -- knows about message types, but not what's inside a patch body
+  private ws?: WebSocket = undefined;
+  private pinger?: ChannelPinger;
+  private connectionId: string = "none"; // server's name for us
+  private reconnectTimer?: NodeJS.Timeout = undefined;
+  private messagesReceived = 0; // (non-ping messages)
+  private messagesSent = 0;
+
+  newConnection: SubEvent<void> = new SubEvent();
+  serverMessage: SubEvent<{ evType: string; body: SyncgraphPatchMessage }> = new SubEvent();
+  statusDisplay: SubEvent<string> = new SubEvent();
+
+  constructor(public patchSenderUrl: string) {
+    this.openConnection();
+  }
+  sendMessage(body: string): boolean {
+    // one try, best effort, true if we think it worked
+    if (!this.ws || this.ws.readyState !== this.ws.OPEN) {
+      return false;
+    }
+    log("send patch to server, " + body.length + " bytes");
+    this.ws.send(body);
+    this.messagesSent++;
+    this.updateStatus();
+    return true;
+  }
+
+  disconnect(why:string) {
+    // will be followed by an autoconnect
+    log("disconnect requested:", why);
+    if (this.ws !== undefined) {
+      const closeHandler = this.ws.onclose?.bind(this.ws);
+      if (!closeHandler) {
+        throw new Error();
+      }
+      closeHandler(new CloseEvent("forced"));
+    }
+  }
+
+  private openConnection() {
+    const wsOrWss = window.location.protocol.replace("http", "ws");
+    const fullUrl = wsOrWss + "//" + window.location.host + this.patchSenderUrl;
+    if (this.ws !== undefined) {
+      this.ws.close();
+    }
+    this.ws = new WebSocket(fullUrl);
+    this.ws.onopen = this.onWsOpen.bind(this, this.ws);
+    this.ws.onerror = this.onWsError.bind(this);
+    this.ws.onclose = this.onWsClose.bind(this);
+    this.ws.onmessage = this.onWsMessage.bind(this);
+  }
+
+  private onWsOpen(ws: WebSocket) {
+    log("new connection to", this.patchSenderUrl);
+    this.updateStatus();
+    this.newConnection.emit();
+    this.pinger = new ChannelPinger(ws);
+  }
+
+  private onWsMessage(evt: { data: string }) {
+    const msg = evt.data;
+    if (msg === "PONG") {
+      this.onPong();
+      return;
+    }
+    this.onJson(msg);
+  }
+
+  private onPong() {
+    if (this.pinger) {
+      this.pinger.pong();
+      this.updateStatus();
+    }
+  }
+
+  private onJson(msg: string) {
+    const input = JSON.parse(msg);
+    if (input.connectedAs) {
+      this.connectionId = input.connectedAs;
+    } else {
+      this.onPatch(input as SyncgraphPatchMessage);
+    }
+  }
+
+  private onPatch(input: SyncgraphPatchMessage) {
+    log(`patch msg from server`);
+    this.serverMessage.emit({ evType: "patch", body: input });
+    this.messagesReceived++;
+    this.updateStatus();
+  }
+
+  private onWsError(e: Event) {
+    log("ws error", e);
+    this.disconnect("ws error");
+    this.updateStatus();
+  }
+
+  private onWsClose(ev: CloseEvent) {
+    log("ws close");
+    this.updateStatus();
+    if (this.reconnectTimer !== undefined) {
+      clearTimeout(this.reconnectTimer);
+    }
+    this.reconnectTimer = setTimeout(this.openConnection.bind(this), 1000);
+  }
+
+  private updateStatus() {
+    const conn = (() => {
+      if (this.ws === undefined) {
+        return "no";
+      } else {
+        switch (this.ws.readyState) {
+          case this.ws.CONNECTING:
+            return "connecting";
+          case this.ws.OPEN:
+            return `open as ${this.connectionId}`;
+          case this.ws.CLOSING:
+            return "closing";
+          case this.ws.CLOSED:
+            return "close";
+        }
+      }
+    })();
+
+    const ping = this.pinger ? this.pinger.lastPingMs() : "...";
+    this.statusDisplay.emit(`${conn}; ${this.messagesReceived} recv; ${this.messagesSent} sent; ping ${ping}ms`);
+  }
+}