Mercurial > code > home > repos > light9
annotate web/RdfDbChannel.ts @ 2440:d1f86109e3cc
more *value getter variants
author | drewp@bigasterisk.com |
---|---|
date | Thu, 30 May 2024 01:08:45 -0700 |
parents | ac55319a2eac |
children |
rev | line source |
---|---|
2256 | 1 import debug from "debug"; |
2 import { SubEvent } from "sub-events"; | |
3 import { SyncgraphPatchMessage } from "./patch"; | |
4 const log = debug("rdfdbclient"); | |
5 | |
6 class ChannelPinger { | |
7 private timeoutId?: number; | |
8 private lastMs: number = 0; | |
9 constructor(private ws: WebSocket) { | |
10 this._pingLoop(); | |
11 } | |
12 lastPingMs(): number { | |
13 return this.lastMs; | |
14 } | |
15 pong() { | |
16 this.lastMs = Date.now() + this.lastMs; | |
17 } | |
18 _pingLoop() { | |
19 if (this.ws.readyState !== this.ws.OPEN) { | |
20 return; | |
21 } | |
22 this.ws.send("PING"); | |
23 this.lastMs = -Date.now(); | |
24 | |
25 if (this.timeoutId != null) { | |
26 clearTimeout(this.timeoutId); | |
27 } | |
28 this.timeoutId = (setTimeout(this._pingLoop.bind(this), 10000) as unknown) as number; | |
29 } | |
30 } | |
31 | |
32 export class RdfDbChannel { | |
33 // lower level reconnecting websocket -- knows about message types, but not what's inside a patch body | |
34 private ws?: WebSocket = undefined; | |
35 private pinger?: ChannelPinger; | |
36 private connectionId: string = "none"; // server's name for us | |
37 private reconnectTimer?: NodeJS.Timeout = undefined; | |
38 private messagesReceived = 0; // (non-ping messages) | |
39 private messagesSent = 0; | |
40 | |
41 newConnection: SubEvent<void> = new SubEvent(); | |
42 serverMessage: SubEvent<{ evType: string; body: SyncgraphPatchMessage }> = new SubEvent(); | |
43 statusDisplay: SubEvent<string> = new SubEvent(); | |
44 | |
45 constructor(public patchSenderUrl: string) { | |
46 this.openConnection(); | |
47 } | |
48 sendMessage(body: string): boolean { | |
49 // one try, best effort, true if we think it worked | |
50 if (!this.ws || this.ws.readyState !== this.ws.OPEN) { | |
2421
ac55319a2eac
don't drop patches that arrive before we get WS connected
drewp@bigasterisk.com
parents:
2376
diff
changeset
|
51 log("dropping message: " + body); |
2256 | 52 return false; |
53 } | |
54 log("send patch to server, " + body.length + " bytes"); | |
55 this.ws.send(body); | |
2289 | 56 this.messagesSent++; |
57 this.updateStatus(); | |
2256 | 58 return true; |
59 } | |
60 | |
2322 | 61 disconnect(why:string) { |
2256 | 62 // will be followed by an autoconnect |
2322 | 63 log("disconnect requested:", why); |
2256 | 64 if (this.ws !== undefined) { |
65 const closeHandler = this.ws.onclose?.bind(this.ws); | |
66 if (!closeHandler) { | |
67 throw new Error(); | |
68 } | |
69 closeHandler(new CloseEvent("forced")); | |
70 } | |
71 } | |
72 | |
73 private openConnection() { | |
74 const wsOrWss = window.location.protocol.replace("http", "ws"); | |
75 const fullUrl = wsOrWss + "//" + window.location.host + this.patchSenderUrl; | |
76 if (this.ws !== undefined) { | |
77 this.ws.close(); | |
78 } | |
79 this.ws = new WebSocket(fullUrl); | |
80 this.ws.onopen = this.onWsOpen.bind(this, this.ws); | |
81 this.ws.onerror = this.onWsError.bind(this); | |
82 this.ws.onclose = this.onWsClose.bind(this); | |
83 this.ws.onmessage = this.onWsMessage.bind(this); | |
84 } | |
85 | |
86 private onWsOpen(ws: WebSocket) { | |
87 log("new connection to", this.patchSenderUrl); | |
88 this.updateStatus(); | |
89 this.newConnection.emit(); | |
90 this.pinger = new ChannelPinger(ws); | |
91 } | |
92 | |
93 private onWsMessage(evt: { data: string }) { | |
94 const msg = evt.data; | |
95 if (msg === "PONG") { | |
96 this.onPong(); | |
97 return; | |
98 } | |
99 this.onJson(msg); | |
100 } | |
101 | |
102 private onPong() { | |
103 if (this.pinger) { | |
104 this.pinger.pong(); | |
105 this.updateStatus(); | |
106 } | |
107 } | |
108 | |
109 private onJson(msg: string) { | |
110 const input = JSON.parse(msg); | |
111 if (input.connectedAs) { | |
112 this.connectionId = input.connectedAs; | |
113 } else { | |
114 this.onPatch(input as SyncgraphPatchMessage); | |
115 } | |
116 } | |
117 | |
118 private onPatch(input: SyncgraphPatchMessage) { | |
2285 | 119 log(`patch msg from server`); |
2256 | 120 this.serverMessage.emit({ evType: "patch", body: input }); |
121 this.messagesReceived++; | |
122 this.updateStatus(); | |
123 } | |
124 | |
125 private onWsError(e: Event) { | |
126 log("ws error", e); | |
2322 | 127 this.disconnect("ws error"); |
2256 | 128 this.updateStatus(); |
129 } | |
130 | |
131 private onWsClose(ev: CloseEvent) { | |
132 log("ws close"); | |
133 this.updateStatus(); | |
134 if (this.reconnectTimer !== undefined) { | |
135 clearTimeout(this.reconnectTimer); | |
136 } | |
137 this.reconnectTimer = setTimeout(this.openConnection.bind(this), 1000); | |
138 } | |
139 | |
140 private updateStatus() { | |
141 const conn = (() => { | |
142 if (this.ws === undefined) { | |
143 return "no"; | |
144 } else { | |
145 switch (this.ws.readyState) { | |
146 case this.ws.CONNECTING: | |
147 return "connecting"; | |
148 case this.ws.OPEN: | |
149 return `open as ${this.connectionId}`; | |
150 case this.ws.CLOSING: | |
151 return "closing"; | |
152 case this.ws.CLOSED: | |
153 return "close"; | |
154 } | |
155 } | |
156 })(); | |
157 | |
158 const ping = this.pinger ? this.pinger.lastPingMs() : "..."; | |
159 this.statusDisplay.emit(`${conn}; ${this.messagesReceived} recv; ${this.messagesSent} sent; ping ${ping}ms`); | |
160 } | |
161 } |