Mercurial > code > home > repos > light9
comparison web/RdfDbChannel.ts @ 2376:4556eebe5d73
topdir reorgs; let pdm have its src/ dir; separate vite area from light9/
author | drewp@bigasterisk.com |
---|---|
date | Sun, 12 May 2024 19:02:10 -0700 |
parents | light9/web/RdfDbChannel.ts@cdfd2901918a |
children | ac55319a2eac |
comparison
equal
deleted
inserted
replaced
2375:623836db99af | 2376:4556eebe5d73 |
---|---|
1 import debug from "debug"; | |
2 import { SubEvent } from "sub-events"; | |
3 import { SyncgraphPatchMessage } from "./patch"; | |
4 const log = debug("rdfdbclient"); | |
5 | |
6 class ChannelPinger { | |
7 private timeoutId?: number; | |
8 private lastMs: number = 0; | |
9 constructor(private ws: WebSocket) { | |
10 this._pingLoop(); | |
11 } | |
12 lastPingMs(): number { | |
13 return this.lastMs; | |
14 } | |
15 pong() { | |
16 this.lastMs = Date.now() + this.lastMs; | |
17 } | |
18 _pingLoop() { | |
19 if (this.ws.readyState !== this.ws.OPEN) { | |
20 return; | |
21 } | |
22 this.ws.send("PING"); | |
23 this.lastMs = -Date.now(); | |
24 | |
25 if (this.timeoutId != null) { | |
26 clearTimeout(this.timeoutId); | |
27 } | |
28 this.timeoutId = (setTimeout(this._pingLoop.bind(this), 10000) as unknown) as number; | |
29 } | |
30 } | |
31 | |
32 export class RdfDbChannel { | |
33 // lower level reconnecting websocket -- knows about message types, but not what's inside a patch body | |
34 private ws?: WebSocket = undefined; | |
35 private pinger?: ChannelPinger; | |
36 private connectionId: string = "none"; // server's name for us | |
37 private reconnectTimer?: NodeJS.Timeout = undefined; | |
38 private messagesReceived = 0; // (non-ping messages) | |
39 private messagesSent = 0; | |
40 | |
41 newConnection: SubEvent<void> = new SubEvent(); | |
42 serverMessage: SubEvent<{ evType: string; body: SyncgraphPatchMessage }> = new SubEvent(); | |
43 statusDisplay: SubEvent<string> = new SubEvent(); | |
44 | |
45 constructor(public patchSenderUrl: string) { | |
46 this.openConnection(); | |
47 } | |
48 sendMessage(body: string): boolean { | |
49 // one try, best effort, true if we think it worked | |
50 if (!this.ws || this.ws.readyState !== this.ws.OPEN) { | |
51 return false; | |
52 } | |
53 log("send patch to server, " + body.length + " bytes"); | |
54 this.ws.send(body); | |
55 this.messagesSent++; | |
56 this.updateStatus(); | |
57 return true; | |
58 } | |
59 | |
60 disconnect(why:string) { | |
61 // will be followed by an autoconnect | |
62 log("disconnect requested:", why); | |
63 if (this.ws !== undefined) { | |
64 const closeHandler = this.ws.onclose?.bind(this.ws); | |
65 if (!closeHandler) { | |
66 throw new Error(); | |
67 } | |
68 closeHandler(new CloseEvent("forced")); | |
69 } | |
70 } | |
71 | |
72 private openConnection() { | |
73 const wsOrWss = window.location.protocol.replace("http", "ws"); | |
74 const fullUrl = wsOrWss + "//" + window.location.host + this.patchSenderUrl; | |
75 if (this.ws !== undefined) { | |
76 this.ws.close(); | |
77 } | |
78 this.ws = new WebSocket(fullUrl); | |
79 this.ws.onopen = this.onWsOpen.bind(this, this.ws); | |
80 this.ws.onerror = this.onWsError.bind(this); | |
81 this.ws.onclose = this.onWsClose.bind(this); | |
82 this.ws.onmessage = this.onWsMessage.bind(this); | |
83 } | |
84 | |
85 private onWsOpen(ws: WebSocket) { | |
86 log("new connection to", this.patchSenderUrl); | |
87 this.updateStatus(); | |
88 this.newConnection.emit(); | |
89 this.pinger = new ChannelPinger(ws); | |
90 } | |
91 | |
92 private onWsMessage(evt: { data: string }) { | |
93 const msg = evt.data; | |
94 if (msg === "PONG") { | |
95 this.onPong(); | |
96 return; | |
97 } | |
98 this.onJson(msg); | |
99 } | |
100 | |
101 private onPong() { | |
102 if (this.pinger) { | |
103 this.pinger.pong(); | |
104 this.updateStatus(); | |
105 } | |
106 } | |
107 | |
108 private onJson(msg: string) { | |
109 const input = JSON.parse(msg); | |
110 if (input.connectedAs) { | |
111 this.connectionId = input.connectedAs; | |
112 } else { | |
113 this.onPatch(input as SyncgraphPatchMessage); | |
114 } | |
115 } | |
116 | |
117 private onPatch(input: SyncgraphPatchMessage) { | |
118 log(`patch msg from server`); | |
119 this.serverMessage.emit({ evType: "patch", body: input }); | |
120 this.messagesReceived++; | |
121 this.updateStatus(); | |
122 } | |
123 | |
124 private onWsError(e: Event) { | |
125 log("ws error", e); | |
126 this.disconnect("ws error"); | |
127 this.updateStatus(); | |
128 } | |
129 | |
130 private onWsClose(ev: CloseEvent) { | |
131 log("ws close"); | |
132 this.updateStatus(); | |
133 if (this.reconnectTimer !== undefined) { | |
134 clearTimeout(this.reconnectTimer); | |
135 } | |
136 this.reconnectTimer = setTimeout(this.openConnection.bind(this), 1000); | |
137 } | |
138 | |
139 private updateStatus() { | |
140 const conn = (() => { | |
141 if (this.ws === undefined) { | |
142 return "no"; | |
143 } else { | |
144 switch (this.ws.readyState) { | |
145 case this.ws.CONNECTING: | |
146 return "connecting"; | |
147 case this.ws.OPEN: | |
148 return `open as ${this.connectionId}`; | |
149 case this.ws.CLOSING: | |
150 return "closing"; | |
151 case this.ws.CLOSED: | |
152 return "close"; | |
153 } | |
154 } | |
155 })(); | |
156 | |
157 const ping = this.pinger ? this.pinger.lastPingMs() : "..."; | |
158 this.statusDisplay.emit(`${conn}; ${this.messagesReceived} recv; ${this.messagesSent} sent; ping ${ping}ms`); | |
159 } | |
160 } |