Files
@ 485148ef5686
Branch filter:
Location: light9/web/RdfDbChannel.ts
485148ef5686
4.4 KiB
video/MP2T
reformat
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | import debug from "debug";
import { SubEvent } from "sub-events";
import { SyncgraphPatchMessage } from "./patch";
const log = debug("rdfdbclient");
class ChannelPinger {
private timeoutId?: number;
private lastMs: number = 0;
constructor(private ws: WebSocket) {
this._pingLoop();
}
lastPingMs(): number {
return this.lastMs;
}
pong() {
this.lastMs = Date.now() + this.lastMs;
}
_pingLoop() {
if (this.ws.readyState !== this.ws.OPEN) {
return;
}
this.ws.send("PING");
this.lastMs = -Date.now();
if (this.timeoutId != null) {
clearTimeout(this.timeoutId);
}
this.timeoutId = (setTimeout(this._pingLoop.bind(this), 10000) as unknown) as number;
}
}
export class RdfDbChannel {
// lower level reconnecting websocket -- knows about message types, but not what's inside a patch body
private ws?: WebSocket = undefined;
private pinger?: ChannelPinger;
private connectionId: string = "none"; // server's name for us
private reconnectTimer?: NodeJS.Timeout = undefined;
private messagesReceived = 0; // (non-ping messages)
private messagesSent = 0;
newConnection: SubEvent<void> = new SubEvent();
serverMessage: SubEvent<{ evType: string; body: SyncgraphPatchMessage }> = new SubEvent();
statusDisplay: SubEvent<string> = new SubEvent();
constructor(public patchSenderUrl: string) {
this.openConnection();
}
sendMessage(body: string): boolean {
// one try, best effort, true if we think it worked
if (!this.ws || this.ws.readyState !== this.ws.OPEN) {
return false;
}
log("send patch to server, " + body.length + " bytes");
this.ws.send(body);
this.messagesSent++;
this.updateStatus();
return true;
}
disconnect(why:string) {
// will be followed by an autoconnect
log("disconnect requested:", why);
if (this.ws !== undefined) {
const closeHandler = this.ws.onclose?.bind(this.ws);
if (!closeHandler) {
throw new Error();
}
closeHandler(new CloseEvent("forced"));
}
}
private openConnection() {
const wsOrWss = window.location.protocol.replace("http", "ws");
const fullUrl = wsOrWss + "//" + window.location.host + this.patchSenderUrl;
if (this.ws !== undefined) {
this.ws.close();
}
this.ws = new WebSocket(fullUrl);
this.ws.onopen = this.onWsOpen.bind(this, this.ws);
this.ws.onerror = this.onWsError.bind(this);
this.ws.onclose = this.onWsClose.bind(this);
this.ws.onmessage = this.onWsMessage.bind(this);
}
private onWsOpen(ws: WebSocket) {
log("new connection to", this.patchSenderUrl);
this.updateStatus();
this.newConnection.emit();
this.pinger = new ChannelPinger(ws);
}
private onWsMessage(evt: { data: string }) {
const msg = evt.data;
if (msg === "PONG") {
this.onPong();
return;
}
this.onJson(msg);
}
private onPong() {
if (this.pinger) {
this.pinger.pong();
this.updateStatus();
}
}
private onJson(msg: string) {
const input = JSON.parse(msg);
if (input.connectedAs) {
this.connectionId = input.connectedAs;
} else {
this.onPatch(input as SyncgraphPatchMessage);
}
}
private onPatch(input: SyncgraphPatchMessage) {
log(`patch msg from server`);
this.serverMessage.emit({ evType: "patch", body: input });
this.messagesReceived++;
this.updateStatus();
}
private onWsError(e: Event) {
log("ws error", e);
this.disconnect("ws error");
this.updateStatus();
}
private onWsClose(ev: CloseEvent) {
log("ws close");
this.updateStatus();
if (this.reconnectTimer !== undefined) {
clearTimeout(this.reconnectTimer);
}
this.reconnectTimer = setTimeout(this.openConnection.bind(this), 1000);
}
private updateStatus() {
const conn = (() => {
if (this.ws === undefined) {
return "no";
} else {
switch (this.ws.readyState) {
case this.ws.CONNECTING:
return "connecting";
case this.ws.OPEN:
return `open as ${this.connectionId}`;
case this.ws.CLOSING:
return "closing";
case this.ws.CLOSED:
return "close";
}
}
})();
const ping = this.pinger ? this.pinger.lastPingMs() : "...";
this.statusDisplay.emit(`${conn}; ${this.messagesReceived} recv; ${this.messagesSent} sent; ping ${ping}ms`);
}
}
|