Files @ 6f023afd6c16
Branch filter:

Location: light9/web/RdfDbChannel.ts

drewp@bigasterisk.com
dead code
import debug from "debug";
import { SubEvent } from "sub-events";
import { SyncgraphPatchMessage } from "./patch";
const log = debug("rdfdbclient");

class ChannelPinger {
  private timeoutId?: number;
  private lastMs: number = 0;
  constructor(private ws: WebSocket) {
    this._pingLoop();
  }
  lastPingMs(): number {
    return this.lastMs;
  }
  pong() {
    this.lastMs = Date.now() + this.lastMs;
  }
  _pingLoop() {
    if (this.ws.readyState !== this.ws.OPEN) {
      return;
    }
    this.ws.send("PING");
    this.lastMs = -Date.now();

    if (this.timeoutId != null) {
      clearTimeout(this.timeoutId);
    }
    this.timeoutId = (setTimeout(this._pingLoop.bind(this), 10000) as unknown) as number;
  }
}

export class RdfDbChannel {
  // lower level reconnecting websocket -- knows about message types, but not what's inside a patch body
  private ws?: WebSocket = undefined;
  private pinger?: ChannelPinger;
  private connectionId: string = "none"; // server's name for us
  private reconnectTimer?: NodeJS.Timeout = undefined;
  private messagesReceived = 0; // (non-ping messages)
  private messagesSent = 0;

  newConnection: SubEvent<void> = new SubEvent();
  serverMessage: SubEvent<{ evType: string; body: SyncgraphPatchMessage }> = new SubEvent();
  statusDisplay: SubEvent<string> = new SubEvent();

  constructor(public patchSenderUrl: string) {
    this.openConnection();
  }
  sendMessage(body: string): boolean {
    // one try, best effort, true if we think it worked
    if (!this.ws || this.ws.readyState !== this.ws.OPEN) {
      return false;
    }
    log("send patch to server, " + body.length + " bytes");
    this.ws.send(body);
    this.messagesSent++;
    this.updateStatus();
    return true;
  }

  disconnect(why:string) {
    // will be followed by an autoconnect
    log("disconnect requested:", why);
    if (this.ws !== undefined) {
      const closeHandler = this.ws.onclose?.bind(this.ws);
      if (!closeHandler) {
        throw new Error();
      }
      closeHandler(new CloseEvent("forced"));
    }
  }

  private openConnection() {
    const wsOrWss = window.location.protocol.replace("http", "ws");
    const fullUrl = wsOrWss + "//" + window.location.host + this.patchSenderUrl;
    if (this.ws !== undefined) {
      this.ws.close();
    }
    this.ws = new WebSocket(fullUrl);
    this.ws.onopen = this.onWsOpen.bind(this, this.ws);
    this.ws.onerror = this.onWsError.bind(this);
    this.ws.onclose = this.onWsClose.bind(this);
    this.ws.onmessage = this.onWsMessage.bind(this);
  }

  private onWsOpen(ws: WebSocket) {
    log("new connection to", this.patchSenderUrl);
    this.updateStatus();
    this.newConnection.emit();
    this.pinger = new ChannelPinger(ws);
  }

  private onWsMessage(evt: { data: string }) {
    const msg = evt.data;
    if (msg === "PONG") {
      this.onPong();
      return;
    }
    this.onJson(msg);
  }

  private onPong() {
    if (this.pinger) {
      this.pinger.pong();
      this.updateStatus();
    }
  }

  private onJson(msg: string) {
    const input = JSON.parse(msg);
    if (input.connectedAs) {
      this.connectionId = input.connectedAs;
    } else {
      this.onPatch(input as SyncgraphPatchMessage);
    }
  }

  private onPatch(input: SyncgraphPatchMessage) {
    log(`patch msg from server`);
    this.serverMessage.emit({ evType: "patch", body: input });
    this.messagesReceived++;
    this.updateStatus();
  }

  private onWsError(e: Event) {
    log("ws error", e);
    this.disconnect("ws error");
    this.updateStatus();
  }

  private onWsClose(ev: CloseEvent) {
    log("ws close");
    this.updateStatus();
    if (this.reconnectTimer !== undefined) {
      clearTimeout(this.reconnectTimer);
    }
    this.reconnectTimer = setTimeout(this.openConnection.bind(this), 1000);
  }

  private updateStatus() {
    const conn = (() => {
      if (this.ws === undefined) {
        return "no";
      } else {
        switch (this.ws.readyState) {
          case this.ws.CONNECTING:
            return "connecting";
          case this.ws.OPEN:
            return `open as ${this.connectionId}`;
          case this.ws.CLOSING:
            return "closing";
          case this.ws.CLOSED:
            return "close";
        }
      }
    })();

    const ping = this.pinger ? this.pinger.lastPingMs() : "...";
    this.statusDisplay.emit(`${conn}; ${this.messagesReceived} recv; ${this.messagesSent} sent; ping ${ping}ms`);
  }
}