2256
|
1 import debug from "debug";
|
|
2 import { SubEvent } from "sub-events";
|
|
3 import { SyncgraphPatchMessage } from "./patch";
|
|
4 const log = debug("rdfdbclient");
|
|
5
|
|
6 class ChannelPinger {
|
|
7 private timeoutId?: number;
|
|
8 private lastMs: number = 0;
|
|
9 constructor(private ws: WebSocket) {
|
|
10 this._pingLoop();
|
|
11 }
|
|
12 lastPingMs(): number {
|
|
13 return this.lastMs;
|
|
14 }
|
|
15 pong() {
|
|
16 this.lastMs = Date.now() + this.lastMs;
|
|
17 }
|
|
18 _pingLoop() {
|
|
19 if (this.ws.readyState !== this.ws.OPEN) {
|
|
20 return;
|
|
21 }
|
|
22 this.ws.send("PING");
|
|
23 this.lastMs = -Date.now();
|
|
24
|
|
25 if (this.timeoutId != null) {
|
|
26 clearTimeout(this.timeoutId);
|
|
27 }
|
|
28 this.timeoutId = (setTimeout(this._pingLoop.bind(this), 10000) as unknown) as number;
|
|
29 }
|
|
30 }
|
|
31
|
|
32 export class RdfDbChannel {
|
|
33 // lower level reconnecting websocket -- knows about message types, but not what's inside a patch body
|
|
34 private ws?: WebSocket = undefined;
|
|
35 private pinger?: ChannelPinger;
|
|
36 private connectionId: string = "none"; // server's name for us
|
|
37 private reconnectTimer?: NodeJS.Timeout = undefined;
|
|
38 private messagesReceived = 0; // (non-ping messages)
|
|
39 private messagesSent = 0;
|
|
40
|
|
41 newConnection: SubEvent<void> = new SubEvent();
|
|
42 serverMessage: SubEvent<{ evType: string; body: SyncgraphPatchMessage }> = new SubEvent();
|
|
43 statusDisplay: SubEvent<string> = new SubEvent();
|
|
44
|
|
45 constructor(public patchSenderUrl: string) {
|
|
46 this.openConnection();
|
|
47 }
|
|
48 sendMessage(body: string): boolean {
|
|
49 // one try, best effort, true if we think it worked
|
|
50 if (!this.ws || this.ws.readyState !== this.ws.OPEN) {
|
|
51 return false;
|
|
52 }
|
|
53 log("send patch to server, " + body.length + " bytes");
|
|
54 this.ws.send(body);
|
2289
|
55 this.messagesSent++;
|
|
56 this.updateStatus();
|
2256
|
57 return true;
|
|
58 }
|
|
59
|
2322
|
60 disconnect(why:string) {
|
2256
|
61 // will be followed by an autoconnect
|
2322
|
62 log("disconnect requested:", why);
|
2256
|
63 if (this.ws !== undefined) {
|
|
64 const closeHandler = this.ws.onclose?.bind(this.ws);
|
|
65 if (!closeHandler) {
|
|
66 throw new Error();
|
|
67 }
|
|
68 closeHandler(new CloseEvent("forced"));
|
|
69 }
|
|
70 }
|
|
71
|
|
72 private openConnection() {
|
|
73 const wsOrWss = window.location.protocol.replace("http", "ws");
|
|
74 const fullUrl = wsOrWss + "//" + window.location.host + this.patchSenderUrl;
|
|
75 if (this.ws !== undefined) {
|
|
76 this.ws.close();
|
|
77 }
|
|
78 this.ws = new WebSocket(fullUrl);
|
|
79 this.ws.onopen = this.onWsOpen.bind(this, this.ws);
|
|
80 this.ws.onerror = this.onWsError.bind(this);
|
|
81 this.ws.onclose = this.onWsClose.bind(this);
|
|
82 this.ws.onmessage = this.onWsMessage.bind(this);
|
|
83 }
|
|
84
|
|
85 private onWsOpen(ws: WebSocket) {
|
|
86 log("new connection to", this.patchSenderUrl);
|
|
87 this.updateStatus();
|
|
88 this.newConnection.emit();
|
|
89 this.pinger = new ChannelPinger(ws);
|
|
90 }
|
|
91
|
|
92 private onWsMessage(evt: { data: string }) {
|
|
93 const msg = evt.data;
|
|
94 if (msg === "PONG") {
|
|
95 this.onPong();
|
|
96 return;
|
|
97 }
|
|
98 this.onJson(msg);
|
|
99 }
|
|
100
|
|
101 private onPong() {
|
|
102 if (this.pinger) {
|
|
103 this.pinger.pong();
|
|
104 this.updateStatus();
|
|
105 }
|
|
106 }
|
|
107
|
|
108 private onJson(msg: string) {
|
|
109 const input = JSON.parse(msg);
|
|
110 if (input.connectedAs) {
|
|
111 this.connectionId = input.connectedAs;
|
|
112 } else {
|
|
113 this.onPatch(input as SyncgraphPatchMessage);
|
|
114 }
|
|
115 }
|
|
116
|
|
117 private onPatch(input: SyncgraphPatchMessage) {
|
2285
|
118 log(`patch msg from server`);
|
2256
|
119 this.serverMessage.emit({ evType: "patch", body: input });
|
|
120 this.messagesReceived++;
|
|
121 this.updateStatus();
|
|
122 }
|
|
123
|
|
124 private onWsError(e: Event) {
|
|
125 log("ws error", e);
|
2322
|
126 this.disconnect("ws error");
|
2256
|
127 this.updateStatus();
|
|
128 }
|
|
129
|
|
130 private onWsClose(ev: CloseEvent) {
|
|
131 log("ws close");
|
|
132 this.updateStatus();
|
|
133 if (this.reconnectTimer !== undefined) {
|
|
134 clearTimeout(this.reconnectTimer);
|
|
135 }
|
|
136 this.reconnectTimer = setTimeout(this.openConnection.bind(this), 1000);
|
|
137 }
|
|
138
|
|
139 private updateStatus() {
|
|
140 const conn = (() => {
|
|
141 if (this.ws === undefined) {
|
|
142 return "no";
|
|
143 } else {
|
|
144 switch (this.ws.readyState) {
|
|
145 case this.ws.CONNECTING:
|
|
146 return "connecting";
|
|
147 case this.ws.OPEN:
|
|
148 return `open as ${this.connectionId}`;
|
|
149 case this.ws.CLOSING:
|
|
150 return "closing";
|
|
151 case this.ws.CLOSED:
|
|
152 return "close";
|
|
153 }
|
|
154 }
|
|
155 })();
|
|
156
|
|
157 const ping = this.pinger ? this.pinger.lastPingMs() : "...";
|
|
158 this.statusDisplay.emit(`${conn}; ${this.messagesReceived} recv; ${this.messagesSent} sent; ping ${ping}ms`);
|
|
159 }
|
|
160 }
|