changeset 2255:7c94348d6127

big refactor of RdfDbClient, separating the websocket layer
author drewp@bigasterisk.com
date Sat, 27 May 2023 23:05:04 -0700
parents b0be0315426d
children eb34653d315d
files light9/web/rdfdbclient.ts
diffstat 1 files changed, 185 insertions(+), 137 deletions(-) [+]
line wrap: on
line diff
--- a/light9/web/rdfdbclient.ts	Sat May 27 17:57:12 2023 -0700
+++ b/light9/web/rdfdbclient.ts	Sat May 27 23:05:04 2023 -0700
@@ -1,95 +1,64 @@
 import debug from "debug";
-import * as async from "async";
-import { parseJsonPatch, Patch, patchSizeSummary, toJsonPatch } from "./patch";
+import { SubEvent } from "sub-events";
+import { parseJsonPatch, Patch, patchSizeSummary, SyncgraphPatchMessage, toJsonPatch } from "./patch";
 const log = debug("rdfdbclient");
 
-export class RdfDbClient {
-  _patchesToSend: Patch[];
-  _lastPingMs: number;
-  _patchesReceived: number;
-  _patchesSent: number;
-  _connectionId: string;
-  _reconnectionTimeout?: number;
-  ws?: WebSocket;
-  _pingLoopTimeout?: number;
-  // Send and receive patches from rdfdb. Primarily used in SyncedGraph.
-  //
-  // What this should do, and does not yet, is keep the graph
-  // 'coasting' over a reconnect, applying only the diffs from the old
-  // contents to the new ones once they're in. Then, remove all the
-  // clearGraph stuff in graph.coffee that doesn't even work right.
-  //
-  constructor(
-    public patchSenderUrl: string,
-    private clearGraphOnNewConnection: () => void,
-    private applyPatch: (p: Patch) => void,
-    private setStatus: (status: string) => void
-  ) {
-    this._patchesToSend = [];
-    this._lastPingMs = -1;
-    this._patchesReceived = 0;
-    this._patchesSent = 0;
-    this._connectionId = "??";
-    this.ws = undefined;
+class ChannelPinger {
+  private timeoutId?: number;
+  private lastMs: number = 0;
+  constructor(private ws: WebSocket) {
+    this._pingLoop();
+  }
+  lastPingMs(): number {
+    return this.lastMs;
+  }
+  pong() {
+    this.lastMs = Date.now() + this.lastMs;
+  }
+  _pingLoop() {
+    if (this.ws.readyState !== this.ws.OPEN) {
+      return;
+    }
+    this.ws.send("PING");
+    this.lastMs = -Date.now();
+
+    if (this.timeoutId != null) {
+      clearTimeout(this.timeoutId);
+    }
+    this.timeoutId = (setTimeout(this._pingLoop.bind(this), 10000) as unknown) as number;
+  }
+}
 
-    this._newConnection();
+class RdfDbChannel {
+  // lower level reconnecting websocket -- knows about message types, but not what's inside a patch body
+
+  private ws?: WebSocket = undefined;
+  private pinger?: ChannelPinger;
+  private connectionId: string = "none"; // server's name for us
+  private reconnectTimer?: NodeJS.Timeout = undefined;
+  private messagesReceived = 0; // (non-ping messages)
+  private messagesSent = 0;
+
+  newConnection: SubEvent<void> = new SubEvent();
+  serverMessage: SubEvent<{ evType: string; body: SyncgraphPatchMessage }> = new SubEvent();
+  statusDisplay: SubEvent<string> = new SubEvent();
+
+  constructor(public patchSenderUrl: string) {
+    this.openConnection();
+  }
+  sendMessage(body: string): boolean {
+    // one try, best effort, true if we think it worked
+    if (!this.ws || this.ws.readyState !== this.ws.OPEN) {
+      return false;
+    }
+    log("send patch to server, " + body.length + " bytes");
+    this.ws.send(body);
+    return true;
   }
 
-  _updateStatus() {
-    const conn = (() => {
-      if (this.ws === undefined) {
-        return "no";
-      } else {
-        switch (this.ws.readyState) {
-          case this.ws.CONNECTING:
-            return "connecting";
-          case this.ws.OPEN:
-            return `open as ${this._connectionId}`;
-          case this.ws.CLOSING:
-            return "closing";
-          case this.ws.CLOSED:
-            return "close";
-        }
-      }
-    })();
-
-    const ping = this._lastPingMs > 0 ? this._lastPingMs : "...";
-    return this.setStatus(`${conn}; \
-${this._patchesReceived} recv; \
-${this._patchesSent} sent; \
-${this._patchesToSend.length} pending; \
-${ping}ms`);
-  }
-
-  sendPatch(patch: Patch) {
-    log("queue patch to server ", patchSizeSummary(patch));
-    this._patchesToSend.push(patch);
-    this._updateStatus();
-    this._continueSending();
-  }
-
-  _newConnection() {
-    const wsOrWss = window.location.protocol.replace("http", "ws");
-    const fullUrl = wsOrWss + "//" + window.location.host + this.patchSenderUrl;
-    if (this.ws !== undefined) {
-      this.ws.close();
-    }
-    this.ws = new WebSocket(fullUrl);
-    this.ws.onopen = this.onWsOpen.bind(this);
-    this.ws.onerror = this.onWsError.bind(this);
-    this.ws.onclose = this.onWsClose.bind(this);
-    this.ws.onmessage = this._onMessage.bind(this);
-  }
-
-  private onWsOpen() {
-    log("new connection to", this.patchSenderUrl);
-    this._updateStatus();
-    this.clearGraphOnNewConnection();
-    return this._pingLoop();
-  }
-
-  private onWsError(e: Event) {
-    log("ws error", e);
+  disconnect() {
+    // will be followed by an autoconnect
+    log("disconnect requested");
     if (this.ws !== undefined) {
       const closeHandler = this.ws.onclose?.bind(this.ws);
       if (!closeHandler) {
@@ -99,71 +68,150 @@
     }
   }
 
-  private onWsClose(ev: CloseEvent) {
-    log("ws close");
-    this._updateStatus();
-    if (this._reconnectionTimeout !== undefined) {
-      clearTimeout(this._reconnectionTimeout);
+  private openConnection() {
+    const wsOrWss = window.location.protocol.replace("http", "ws");
+    const fullUrl = wsOrWss + "//" + window.location.host + this.patchSenderUrl;
+    if (this.ws !== undefined) {
+      this.ws.close();
     }
-    this._reconnectionTimeout = (setTimeout(this._newConnection.bind(this), 1000) as unknown) as number;
+    this.ws = new WebSocket(fullUrl);
+    this.ws.onopen = this.onWsOpen.bind(this, this.ws);
+    this.ws.onerror = this.onWsError.bind(this);
+    this.ws.onclose = this.onWsClose.bind(this);
+    this.ws.onmessage = this.onWsMessage.bind(this);
+  }
+
+  private onWsOpen(ws: WebSocket) {
+    log("new connection to", this.patchSenderUrl);
+    this.updateStatus();
+    this.newConnection.emit();
+    this.pinger = new ChannelPinger(ws);
   }
 
-  _pingLoop() {
-    if (this.ws && this.ws.readyState === this.ws.OPEN) {
-      this.ws.send("PING");
-      this._lastPingMs = -Date.now();
+  private onWsMessage(evt: { data: string }) {
+    const msg = evt.data;
+    if (msg === "PONG") {
+      this.onPong();
+      return;
+    }
+    this.onJson(msg);
+  }
 
-      if (this._pingLoopTimeout != null) {
-        clearTimeout(this._pingLoopTimeout);
-      }
-      this._pingLoopTimeout = (setTimeout(this._pingLoop.bind(this), 10000) as unknown) as number;
+  private onPong() {
+    if (this.pinger) {
+      this.pinger.pong();
+      this.updateStatus();
+    }
+  }
+
+  private onJson(msg: string) {
+    const input = JSON.parse(msg);
+    if (input.connectedAs) {
+      this.connectionId = input.connectedAs;
+    } else {
+      this.onPatch(input as SyncgraphPatchMessage);
     }
   }
 
-  _onMessage(evt: { data: string }) {
-    const msg = evt.data;
-    if (msg === "PONG") {
-      this._lastPingMs = Date.now() + this._lastPingMs;
-      this._updateStatus();
-      return;
-    }
+  private onPatch(input: SyncgraphPatchMessage) {
+    log("patch from server [0]");
+    this.serverMessage.emit({ evType: "patch", body: input });
+    this.messagesReceived++;
+    this.updateStatus();
+  }
 
-    const input = JSON.parse(msg);
-    if (input.connectedAs) {
-      this._connectionId = input.connectedAs;
-    } else {
-      log("patch from server [0]")
-      parseJsonPatch(input, this.applyPatch.bind(this));
-      this._patchesReceived++;
+  private onWsError(e: Event) {
+    log("ws error", e);
+    this.disconnect();
+    this.updateStatus();
+  }
+
+  private onWsClose(ev: CloseEvent) {
+    log("ws close");
+    this.updateStatus();
+    if (this.reconnectTimer !== undefined) {
+      clearTimeout(this.reconnectTimer);
     }
-    return this._updateStatus();
+    this.reconnectTimer = setTimeout(this.openConnection.bind(this), 1000);
   }
 
-  _continueSending() {
-    if (this.ws && this.ws.readyState !== this.ws.OPEN) {
-      setTimeout(this._continueSending.bind(this), 500);
-      return;
-    }
+  private updateStatus() {
+    const conn = (() => {
+      if (this.ws === undefined) {
+        return "no";
+      } else {
+        switch (this.ws.readyState) {
+          case this.ws.CONNECTING:
+            return "connecting";
+          case this.ws.OPEN:
+            return `open as ${this.connectionId}`;
+          case this.ws.CLOSING:
+            return "closing";
+          case this.ws.CLOSED:
+            return "close";
+        }
+      }
+    })();
+
+    const ping = this.pinger ? this.pinger.lastPingMs() : "...";
+    this.statusDisplay.emit(`${conn}; ${this.messagesReceived} recv; ${this.messagesSent} sent; ping ${ping}ms`);
+  }
+}
 
+export class RdfDbClient {
+  private channel: RdfDbChannel;
+  _patchesToSend: Patch[];
+  // Send and receive patches from rdfdb. Primarily used in SyncedGraph.
+  //
+  // What this should do, and does not yet, is keep the graph
+  // 'coasting' over a reconnect, applying only the diffs from the old
+  // contents to the new ones once they're in. Then, remove all the
+  // clearGraph stuff in graph.coffee that doesn't even work right.
+  //
+  constructor(
+    patchSenderUrl: string,
+    private clearGraphOnNewConnection: () => void,
+    private applyPatch: (p: Patch) => void,
+    setStatus: (status: string) => void
+  ) {
+    this._patchesToSend = [];
+    this.channel = new RdfDbChannel(patchSenderUrl);
+    this.channel.statusDisplay.subscribe((st: string) => {
+      setStatus(st + `; ${this._patchesToSend.length} pending `);
+    });
+    this.channel.newConnection.subscribe(() => {
+      this.clearGraphOnNewConnection();
+    });
+    this.channel.serverMessage.subscribe((m) => {
+      parseJsonPatch(m.body, this.applyPatch.bind(this));
+    });
+  }
+
+  sendPatch(patch: Patch) {
+    log("queue patch to server ", patchSizeSummary(patch));
+    this._patchesToSend.push(patch);
+    this._continueSending();
+  }
+
+  disconnect() {
+    this.channel.disconnect();
+  }
+
+  async _continueSending() {
     // we could call this less often and coalesce patches together to optimize
     // the dragging cases.
+    while (this._patchesToSend.length) {
+      const patch = this._patchesToSend.splice(0, 1)[0];
+      const json = await new Promise<string>((res, rej) => {
+        toJsonPatch(patch, res);
+      });
+      const ret = this.channel.sendMessage(json);
+      if (!ret) {
+        setTimeout(this._continueSending.bind(this), 500);
 
-    const sendOne = (patch: any, cb: (arg0: any) => any) => {
-      return toJsonPatch(patch, (json: string) => {
-        log("send patch to server, " + json.length + " bytes");
-        if (!this.ws) {
-          throw new Error("can't send");
-        }
-        this.ws.send(json);
-        this._patchesSent++;
-        this._updateStatus();
-        return cb(null);
-      });
-    };
-
-    return async.eachSeries(this._patchesToSend, sendOne, () => {
-      this._patchesToSend = [];
-      return this._updateStatus();
-    });
+        // this.disconnect()
+        return;
+      }
+    }
   }
 }