Files @ 69ca2b2fc133
Branch filter:

Location: light9/web/RdfDbChannel.ts

drewp@bigasterisk.com
overcomplicated attempt at persisting the pane layout in the rdf graph

this was hard because we have to somehow wait for the graph to load before config'ing the panes
import debug from "debug";
import { SubEvent } from "sub-events";
import { SyncgraphPatchMessage } from "./patch";
const log = debug("rdfdbclient");

class ChannelPinger {
  private timeoutId?: number;
  private lastMs: number = 0;
  constructor(private ws: WebSocket) {
    this._pingLoop();
  }
  lastPingMs(): number {
    return this.lastMs;
  }
  pong() {
    this.lastMs = Date.now() + this.lastMs;
  }
  _pingLoop() {
    if (this.ws.readyState !== this.ws.OPEN) {
      return;
    }
    this.ws.send("PING");
    this.lastMs = -Date.now();

    if (this.timeoutId != null) {
      clearTimeout(this.timeoutId);
    }
    this.timeoutId = (setTimeout(this._pingLoop.bind(this), 10000) as unknown) as number;
  }
}

export class RdfDbChannel {
  // lower level reconnecting websocket -- knows about message types, but not what's inside a patch body
  private ws?: WebSocket = undefined;
  private pinger?: ChannelPinger;
  private connectionId: string = "none"; // server's name for us
  private reconnectTimer?: NodeJS.Timeout = undefined;
  private messagesReceived = 0; // (non-ping messages)
  private messagesSent = 0;

  newConnection: SubEvent<void> = new SubEvent();
  serverMessage: SubEvent<{ evType: string; body: SyncgraphPatchMessage }> = new SubEvent();
  statusDisplay: SubEvent<string> = new SubEvent();

  constructor(public patchSenderUrl: string) {
    this.openConnection();
  }
  sendMessage(body: string): boolean {
    // one try, best effort, true if we think it worked
    if (!this.ws || this.ws.readyState !== this.ws.OPEN) {
      return false;
    }
    log("send patch to server, " + body.length + " bytes");
    this.ws.send(body);
    this.messagesSent++;
    this.updateStatus();
    return true;
  }

  disconnect(why:string) {
    // will be followed by an autoconnect
    log("disconnect requested:", why);
    if (this.ws !== undefined) {
      const closeHandler = this.ws.onclose?.bind(this.ws);
      if (!closeHandler) {
        throw new Error();
      }
      closeHandler(new CloseEvent("forced"));
    }
  }

  private openConnection() {
    const wsOrWss = window.location.protocol.replace("http", "ws");
    const fullUrl = wsOrWss + "//" + window.location.host + this.patchSenderUrl;
    if (this.ws !== undefined) {
      this.ws.close();
    }
    this.ws = new WebSocket(fullUrl);
    this.ws.onopen = this.onWsOpen.bind(this, this.ws);
    this.ws.onerror = this.onWsError.bind(this);
    this.ws.onclose = this.onWsClose.bind(this);
    this.ws.onmessage = this.onWsMessage.bind(this);
  }

  private onWsOpen(ws: WebSocket) {
    log("new connection to", this.patchSenderUrl);
    this.updateStatus();
    this.newConnection.emit();
    this.pinger = new ChannelPinger(ws);
  }

  private onWsMessage(evt: { data: string }) {
    const msg = evt.data;
    if (msg === "PONG") {
      this.onPong();
      return;
    }
    this.onJson(msg);
  }

  private onPong() {
    if (this.pinger) {
      this.pinger.pong();
      this.updateStatus();
    }
  }

  private onJson(msg: string) {
    const input = JSON.parse(msg);
    if (input.connectedAs) {
      this.connectionId = input.connectedAs;
    } else {
      this.onPatch(input as SyncgraphPatchMessage);
    }
  }

  private onPatch(input: SyncgraphPatchMessage) {
    log(`patch msg from server`);
    this.serverMessage.emit({ evType: "patch", body: input });
    this.messagesReceived++;
    this.updateStatus();
  }

  private onWsError(e: Event) {
    log("ws error", e);
    this.disconnect("ws error");
    this.updateStatus();
  }

  private onWsClose(ev: CloseEvent) {
    log("ws close");
    this.updateStatus();
    if (this.reconnectTimer !== undefined) {
      clearTimeout(this.reconnectTimer);
    }
    this.reconnectTimer = setTimeout(this.openConnection.bind(this), 1000);
  }

  private updateStatus() {
    const conn = (() => {
      if (this.ws === undefined) {
        return "no";
      } else {
        switch (this.ws.readyState) {
          case this.ws.CONNECTING:
            return "connecting";
          case this.ws.OPEN:
            return `open as ${this.connectionId}`;
          case this.ws.CLOSING:
            return "closing";
          case this.ws.CLOSED:
            return "close";
        }
      }
    })();

    const ping = this.pinger ? this.pinger.lastPingMs() : "...";
    this.statusDisplay.emit(`${conn}; ${this.messagesReceived} recv; ${this.messagesSent} sent; ping ${ping}ms`);
  }
}