Files @ e3af0ac507c8
Branch filter:

Location: light9/web/RdfDbChannel.ts - annotation

drewp@bigasterisk.com
new exposure-finder algorithm
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
4556eebe5d73
import debug from "debug";
import { SubEvent } from "sub-events";
import { SyncgraphPatchMessage } from "./patch";
const log = debug("rdfdbclient");

class ChannelPinger {
  private timeoutId?: number;
  private lastMs: number = 0;
  constructor(private ws: WebSocket) {
    this._pingLoop();
  }
  lastPingMs(): number {
    return this.lastMs;
  }
  pong() {
    this.lastMs = Date.now() + this.lastMs;
  }
  _pingLoop() {
    if (this.ws.readyState !== this.ws.OPEN) {
      return;
    }
    this.ws.send("PING");
    this.lastMs = -Date.now();

    if (this.timeoutId != null) {
      clearTimeout(this.timeoutId);
    }
    this.timeoutId = (setTimeout(this._pingLoop.bind(this), 10000) as unknown) as number;
  }
}

export class RdfDbChannel {
  // lower level reconnecting websocket -- knows about message types, but not what's inside a patch body
  private ws?: WebSocket = undefined;
  private pinger?: ChannelPinger;
  private connectionId: string = "none"; // server's name for us
  private reconnectTimer?: NodeJS.Timeout = undefined;
  private messagesReceived = 0; // (non-ping messages)
  private messagesSent = 0;

  newConnection: SubEvent<void> = new SubEvent();
  serverMessage: SubEvent<{ evType: string; body: SyncgraphPatchMessage }> = new SubEvent();
  statusDisplay: SubEvent<string> = new SubEvent();

  constructor(public patchSenderUrl: string) {
    this.openConnection();
  }
  sendMessage(body: string): boolean {
    // one try, best effort, true if we think it worked
    if (!this.ws || this.ws.readyState !== this.ws.OPEN) {
      return false;
    }
    log("send patch to server, " + body.length + " bytes");
    this.ws.send(body);
    this.messagesSent++;
    this.updateStatus();
    return true;
  }

  disconnect(why:string) {
    // will be followed by an autoconnect
    log("disconnect requested:", why);
    if (this.ws !== undefined) {
      const closeHandler = this.ws.onclose?.bind(this.ws);
      if (!closeHandler) {
        throw new Error();
      }
      closeHandler(new CloseEvent("forced"));
    }
  }

  private openConnection() {
    const wsOrWss = window.location.protocol.replace("http", "ws");
    const fullUrl = wsOrWss + "//" + window.location.host + this.patchSenderUrl;
    if (this.ws !== undefined) {
      this.ws.close();
    }
    this.ws = new WebSocket(fullUrl);
    this.ws.onopen = this.onWsOpen.bind(this, this.ws);
    this.ws.onerror = this.onWsError.bind(this);
    this.ws.onclose = this.onWsClose.bind(this);
    this.ws.onmessage = this.onWsMessage.bind(this);
  }

  private onWsOpen(ws: WebSocket) {
    log("new connection to", this.patchSenderUrl);
    this.updateStatus();
    this.newConnection.emit();
    this.pinger = new ChannelPinger(ws);
  }

  private onWsMessage(evt: { data: string }) {
    const msg = evt.data;
    if (msg === "PONG") {
      this.onPong();
      return;
    }
    this.onJson(msg);
  }

  private onPong() {
    if (this.pinger) {
      this.pinger.pong();
      this.updateStatus();
    }
  }

  private onJson(msg: string) {
    const input = JSON.parse(msg);
    if (input.connectedAs) {
      this.connectionId = input.connectedAs;
    } else {
      this.onPatch(input as SyncgraphPatchMessage);
    }
  }

  private onPatch(input: SyncgraphPatchMessage) {
    log(`patch msg from server`);
    this.serverMessage.emit({ evType: "patch", body: input });
    this.messagesReceived++;
    this.updateStatus();
  }

  private onWsError(e: Event) {
    log("ws error", e);
    this.disconnect("ws error");
    this.updateStatus();
  }

  private onWsClose(ev: CloseEvent) {
    log("ws close");
    this.updateStatus();
    if (this.reconnectTimer !== undefined) {
      clearTimeout(this.reconnectTimer);
    }
    this.reconnectTimer = setTimeout(this.openConnection.bind(this), 1000);
  }

  private updateStatus() {
    const conn = (() => {
      if (this.ws === undefined) {
        return "no";
      } else {
        switch (this.ws.readyState) {
          case this.ws.CONNECTING:
            return "connecting";
          case this.ws.OPEN:
            return `open as ${this.connectionId}`;
          case this.ws.CLOSING:
            return "closing";
          case this.ws.CLOSED:
            return "close";
        }
      }
    })();

    const ping = this.pinger ? this.pinger.lastPingMs() : "...";
    this.statusDisplay.emit(`${conn}; ${this.messagesReceived} recv; ${this.messagesSent} sent; ping ${ping}ms`);
  }
}