Changeset - 7c94348d6127
[Not reviewed]
default
0 1 0
drewp@bigasterisk.com - 20 months ago 2023-05-28 06:05:04
drewp@bigasterisk.com
big refactor of RdfDbClient, separating the websocket layer
1 file changed with 185 insertions and 137 deletions:
0 comments (0 inline, 0 general)
light9/web/rdfdbclient.ts
Show inline comments
 
import debug from "debug";
 
import * as async from "async";
 
import { parseJsonPatch, Patch, patchSizeSummary, toJsonPatch } from "./patch";
 
import { SubEvent } from "sub-events";
 
import { parseJsonPatch, Patch, patchSizeSummary, SyncgraphPatchMessage, toJsonPatch } from "./patch";
 
const log = debug("rdfdbclient");
 

	
 
export class RdfDbClient {
 
  _patchesToSend: Patch[];
 
  _lastPingMs: number;
 
  _patchesReceived: number;
 
  _patchesSent: number;
 
  _connectionId: string;
 
  _reconnectionTimeout?: number;
 
  ws?: WebSocket;
 
  _pingLoopTimeout?: number;
 
  // Send and receive patches from rdfdb. Primarily used in SyncedGraph.
 
  //
 
  // What this should do, and does not yet, is keep the graph
 
  // 'coasting' over a reconnect, applying only the diffs from the old
 
  // contents to the new ones once they're in. Then, remove all the
 
  // clearGraph stuff in graph.coffee that doesn't even work right.
 
  //
 
  constructor(
 
    public patchSenderUrl: string,
 
    private clearGraphOnNewConnection: () => void,
 
    private applyPatch: (p: Patch) => void,
 
    private setStatus: (status: string) => void
 
  ) {
 
    this._patchesToSend = [];
 
    this._lastPingMs = -1;
 
    this._patchesReceived = 0;
 
    this._patchesSent = 0;
 
    this._connectionId = "??";
 
    this.ws = undefined;
 
class ChannelPinger {
 
  private timeoutId?: number;
 
  private lastMs: number = 0;
 
  constructor(private ws: WebSocket) {
 
    this._pingLoop();
 
  }
 
  lastPingMs(): number {
 
    return this.lastMs;
 
  }
 
  pong() {
 
    this.lastMs = Date.now() + this.lastMs;
 
  }
 
  _pingLoop() {
 
    if (this.ws.readyState !== this.ws.OPEN) {
 
      return;
 
    }
 
    this.ws.send("PING");
 
    this.lastMs = -Date.now();
 

	
 
    if (this.timeoutId != null) {
 
      clearTimeout(this.timeoutId);
 
    }
 
    this.timeoutId = (setTimeout(this._pingLoop.bind(this), 10000) as unknown) as number;
 
  }
 
}
 

	
 
    this._newConnection();
 
class RdfDbChannel {
 
  // lower level reconnecting websocket -- knows about message types, but not what's inside a patch body
 

	
 
  private ws?: WebSocket = undefined;
 
  private pinger?: ChannelPinger;
 
  private connectionId: string = "none"; // server's name for us
 
  private reconnectTimer?: NodeJS.Timeout = undefined;
 
  private messagesReceived = 0; // (non-ping messages)
 
  private messagesSent = 0;
 

	
 
  newConnection: SubEvent<void> = new SubEvent();
 
  serverMessage: SubEvent<{ evType: string; body: SyncgraphPatchMessage }> = new SubEvent();
 
  statusDisplay: SubEvent<string> = new SubEvent();
 

	
 
  constructor(public patchSenderUrl: string) {
 
    this.openConnection();
 
  }
 
  sendMessage(body: string): boolean {
 
    // one try, best effort, true if we think it worked
 
    if (!this.ws || this.ws.readyState !== this.ws.OPEN) {
 
      return false;
 
    }
 
    log("send patch to server, " + body.length + " bytes");
 
    this.ws.send(body);
 
    return true;
 
  }
 

	
 
  _updateStatus() {
 
    const conn = (() => {
 
      if (this.ws === undefined) {
 
        return "no";
 
      } else {
 
        switch (this.ws.readyState) {
 
          case this.ws.CONNECTING:
 
            return "connecting";
 
          case this.ws.OPEN:
 
            return `open as ${this._connectionId}`;
 
          case this.ws.CLOSING:
 
            return "closing";
 
          case this.ws.CLOSED:
 
            return "close";
 
        }
 
      }
 
    })();
 

	
 
    const ping = this._lastPingMs > 0 ? this._lastPingMs : "...";
 
    return this.setStatus(`${conn}; \
 
${this._patchesReceived} recv; \
 
${this._patchesSent} sent; \
 
${this._patchesToSend.length} pending; \
 
${ping}ms`);
 
  }
 

	
 
  sendPatch(patch: Patch) {
 
    log("queue patch to server ", patchSizeSummary(patch));
 
    this._patchesToSend.push(patch);
 
    this._updateStatus();
 
    this._continueSending();
 
  }
 

	
 
  _newConnection() {
 
    const wsOrWss = window.location.protocol.replace("http", "ws");
 
    const fullUrl = wsOrWss + "//" + window.location.host + this.patchSenderUrl;
 
    if (this.ws !== undefined) {
 
      this.ws.close();
 
    }
 
    this.ws = new WebSocket(fullUrl);
 
    this.ws.onopen = this.onWsOpen.bind(this);
 
    this.ws.onerror = this.onWsError.bind(this);
 
    this.ws.onclose = this.onWsClose.bind(this);
 
    this.ws.onmessage = this._onMessage.bind(this);
 
  }
 

	
 
  private onWsOpen() {
 
    log("new connection to", this.patchSenderUrl);
 
    this._updateStatus();
 
    this.clearGraphOnNewConnection();
 
    return this._pingLoop();
 
  }
 

	
 
  private onWsError(e: Event) {
 
    log("ws error", e);
 
  disconnect() {
 
    // will be followed by an autoconnect
 
    log("disconnect requested");
 
    if (this.ws !== undefined) {
 
      const closeHandler = this.ws.onclose?.bind(this.ws);
 
      if (!closeHandler) {
 
        throw new Error();
 
      }
 
      closeHandler(new CloseEvent("forced"));
 
    }
 
  }
 

	
 
  private onWsClose(ev: CloseEvent) {
 
    log("ws close");
 
    this._updateStatus();
 
    if (this._reconnectionTimeout !== undefined) {
 
      clearTimeout(this._reconnectionTimeout);
 
  private openConnection() {
 
    const wsOrWss = window.location.protocol.replace("http", "ws");
 
    const fullUrl = wsOrWss + "//" + window.location.host + this.patchSenderUrl;
 
    if (this.ws !== undefined) {
 
      this.ws.close();
 
    }
 
    this._reconnectionTimeout = (setTimeout(this._newConnection.bind(this), 1000) as unknown) as number;
 
    this.ws = new WebSocket(fullUrl);
 
    this.ws.onopen = this.onWsOpen.bind(this, this.ws);
 
    this.ws.onerror = this.onWsError.bind(this);
 
    this.ws.onclose = this.onWsClose.bind(this);
 
    this.ws.onmessage = this.onWsMessage.bind(this);
 
  }
 

	
 
  private onWsOpen(ws: WebSocket) {
 
    log("new connection to", this.patchSenderUrl);
 
    this.updateStatus();
 
    this.newConnection.emit();
 
    this.pinger = new ChannelPinger(ws);
 
  }
 

	
 
  _pingLoop() {
 
    if (this.ws && this.ws.readyState === this.ws.OPEN) {
 
      this.ws.send("PING");
 
      this._lastPingMs = -Date.now();
 
  private onWsMessage(evt: { data: string }) {
 
    const msg = evt.data;
 
    if (msg === "PONG") {
 
      this.onPong();
 
      return;
 
    }
 
    this.onJson(msg);
 
  }
 

	
 
      if (this._pingLoopTimeout != null) {
 
        clearTimeout(this._pingLoopTimeout);
 
      }
 
      this._pingLoopTimeout = (setTimeout(this._pingLoop.bind(this), 10000) as unknown) as number;
 
  private onPong() {
 
    if (this.pinger) {
 
      this.pinger.pong();
 
      this.updateStatus();
 
    }
 
  }
 

	
 
  private onJson(msg: string) {
 
    const input = JSON.parse(msg);
 
    if (input.connectedAs) {
 
      this.connectionId = input.connectedAs;
 
    } else {
 
      this.onPatch(input as SyncgraphPatchMessage);
 
    }
 
  }
 

	
 
  _onMessage(evt: { data: string }) {
 
    const msg = evt.data;
 
    if (msg === "PONG") {
 
      this._lastPingMs = Date.now() + this._lastPingMs;
 
      this._updateStatus();
 
      return;
 
    }
 
  private onPatch(input: SyncgraphPatchMessage) {
 
    log("patch from server [0]");
 
    this.serverMessage.emit({ evType: "patch", body: input });
 
    this.messagesReceived++;
 
    this.updateStatus();
 
  }
 

	
 
    const input = JSON.parse(msg);
 
    if (input.connectedAs) {
 
      this._connectionId = input.connectedAs;
 
    } else {
 
      log("patch from server [0]")
 
      parseJsonPatch(input, this.applyPatch.bind(this));
 
      this._patchesReceived++;
 
  private onWsError(e: Event) {
 
    log("ws error", e);
 
    this.disconnect();
 
    this.updateStatus();
 
  }
 

	
 
  private onWsClose(ev: CloseEvent) {
 
    log("ws close");
 
    this.updateStatus();
 
    if (this.reconnectTimer !== undefined) {
 
      clearTimeout(this.reconnectTimer);
 
    }
 
    return this._updateStatus();
 
    this.reconnectTimer = setTimeout(this.openConnection.bind(this), 1000);
 
  }
 

	
 
  _continueSending() {
 
    if (this.ws && this.ws.readyState !== this.ws.OPEN) {
 
      setTimeout(this._continueSending.bind(this), 500);
 
      return;
 
    }
 
  private updateStatus() {
 
    const conn = (() => {
 
      if (this.ws === undefined) {
 
        return "no";
 
      } else {
 
        switch (this.ws.readyState) {
 
          case this.ws.CONNECTING:
 
            return "connecting";
 
          case this.ws.OPEN:
 
            return `open as ${this.connectionId}`;
 
          case this.ws.CLOSING:
 
            return "closing";
 
          case this.ws.CLOSED:
 
            return "close";
 
        }
 
      }
 
    })();
 

	
 
    const ping = this.pinger ? this.pinger.lastPingMs() : "...";
 
    this.statusDisplay.emit(`${conn}; ${this.messagesReceived} recv; ${this.messagesSent} sent; ping ${ping}ms`);
 
  }
 
}
 

	
 
export class RdfDbClient {
 
  private channel: RdfDbChannel;
 
  _patchesToSend: Patch[];
 
  // Send and receive patches from rdfdb. Primarily used in SyncedGraph.
 
  //
 
  // What this should do, and does not yet, is keep the graph
 
  // 'coasting' over a reconnect, applying only the diffs from the old
 
  // contents to the new ones once they're in. Then, remove all the
 
  // clearGraph stuff in graph.coffee that doesn't even work right.
 
  //
 
  constructor(
 
    patchSenderUrl: string,
 
    private clearGraphOnNewConnection: () => void,
 
    private applyPatch: (p: Patch) => void,
 
    setStatus: (status: string) => void
 
  ) {
 
    this._patchesToSend = [];
 
    this.channel = new RdfDbChannel(patchSenderUrl);
 
    this.channel.statusDisplay.subscribe((st: string) => {
 
      setStatus(st + `; ${this._patchesToSend.length} pending `);
 
    });
 
    this.channel.newConnection.subscribe(() => {
 
      this.clearGraphOnNewConnection();
 
    });
 
    this.channel.serverMessage.subscribe((m) => {
 
      parseJsonPatch(m.body, this.applyPatch.bind(this));
 
    });
 
  }
 

	
 
  sendPatch(patch: Patch) {
 
    log("queue patch to server ", patchSizeSummary(patch));
 
    this._patchesToSend.push(patch);
 
    this._continueSending();
 
  }
 

	
 
  disconnect() {
 
    this.channel.disconnect();
 
  }
 

	
 
  async _continueSending() {
 
    // we could call this less often and coalesce patches together to optimize
 
    // the dragging cases.
 
    while (this._patchesToSend.length) {
 
      const patch = this._patchesToSend.splice(0, 1)[0];
 
      const json = await new Promise<string>((res, rej) => {
 
        toJsonPatch(patch, res);
 
      });
 
      const ret = this.channel.sendMessage(json);
 
      if (!ret) {
 
        setTimeout(this._continueSending.bind(this), 500);
 

	
 
    const sendOne = (patch: any, cb: (arg0: any) => any) => {
 
      return toJsonPatch(patch, (json: string) => {
 
        log("send patch to server, " + json.length + " bytes");
 
        if (!this.ws) {
 
          throw new Error("can't send");
 
        }
 
        this.ws.send(json);
 
        this._patchesSent++;
 
        this._updateStatus();
 
        return cb(null);
 
      });
 
    };
 

	
 
    return async.eachSeries(this._patchesToSend, sendOne, () => {
 
      this._patchesToSend = [];
 
      return this._updateStatus();
 
    });
 
        // this.disconnect()
 
        return;
 
      }
 
    }
 
  }
 
}
0 comments (0 inline, 0 general)