comparison web/rdfdbclient.ts @ 2421:ac55319a2eac

don't drop patches that arrive before we get WS connected
author drewp@bigasterisk.com
date Tue, 21 May 2024 16:10:39 -0700
parents 4556eebe5d73
children
comparison
equal deleted inserted replaced
2420:d5750b2aaa9e 2421:ac55319a2eac
3 import { RdfDbChannel } from "./RdfDbChannel"; 3 import { RdfDbChannel } from "./RdfDbChannel";
4 const log = debug("rdfdbclient"); 4 const log = debug("rdfdbclient");
5 5
6 export class RdfDbClient { 6 export class RdfDbClient {
7 private channel: RdfDbChannel; 7 private channel: RdfDbChannel;
8 _patchesToSend: Patch[]; 8 private patchesToSend: Patch[];
9 // Send and receive patches from rdfdb. Primarily used in SyncedGraph. 9 // Send and receive patches from rdfdb. Primarily used in SyncedGraph.
10 // 10 //
11 // What this should do, and does not yet, is keep the graph 11 // What this should do, and does not yet, is keep the graph
12 // 'coasting' over a reconnect, applying only the diffs from the old 12 // 'coasting' over a reconnect, applying only the diffs from the old
13 // contents to the new ones once they're in. Then, remove all the 13 // contents to the new ones once they're in. Then, remove all the
17 patchSenderUrl: string, 17 patchSenderUrl: string,
18 private clearGraphOnNewConnection: () => void, 18 private clearGraphOnNewConnection: () => void,
19 private applyPatch: (p: Patch) => void, 19 private applyPatch: (p: Patch) => void,
20 setStatus: (status: string) => void 20 setStatus: (status: string) => void
21 ) { 21 ) {
22 this._patchesToSend = []; 22 this.patchesToSend = [];
23 this.channel = new RdfDbChannel(patchSenderUrl); 23 this.channel = new RdfDbChannel(patchSenderUrl);
24 this.channel.statusDisplay.subscribe((st: string) => { 24 this.channel.statusDisplay.subscribe((st: string) => {
25 setStatus(st + `; ${this._patchesToSend.length} pending `); 25 setStatus(st + `; ${this.patchesToSend.length} pending `);
26 }); 26 });
27 this.channel.newConnection.subscribe(() => { 27 this.channel.newConnection.subscribe(() => {
28 this.clearGraphOnNewConnection(); 28 this.clearGraphOnNewConnection();
29 }); 29 });
30 this.channel.serverMessage.subscribe((m) => { 30 this.channel.serverMessage.subscribe((m) => {
38 }); 38 });
39 } 39 }
40 40
41 sendPatch(patch: Patch) { 41 sendPatch(patch: Patch) {
42 log("queue patch to server ", patch.summary()); 42 log("queue patch to server ", patch.summary());
43 this._patchesToSend.push(patch); 43 this.patchesToSend.push(patch);
44 this._continueSending(); 44 this._continueSending();
45 } 45 }
46 46
47 disconnect(why:string) { 47 disconnect(why:string) {
48 this.channel.disconnect(why); 48 this.channel.disconnect(why);
49 } 49 }
50 50
51 async _continueSending() { 51 async _continueSending() {
52 // we could call this less often and coalesce patches together to optimize 52 // we could call this less often and coalesce patches together to optimize
53 // the dragging cases. See rdfdb 'compactPatches' and 'processInbox'. 53 // the dragging cases. See rdfdb 'compactPatches' and 'processInbox'.
54 while (this._patchesToSend.length) { 54 while (this.patchesToSend.length) {
55 const patch = this._patchesToSend.splice(0, 1)[0]; 55 const patch = this.patchesToSend.splice(0, 1)[0];
56 const json = await patch.toJsonPatch(); 56 const json = await patch.toJsonPatch();
57 const ret = this.channel.sendMessage(json); 57 const ret = this.channel.sendMessage(json);
58 if (!ret) { 58 if (!ret) {
59 log('sendMessage failed- retrying')
60 this.patchesToSend.unshift(patch);
59 setTimeout(this._continueSending.bind(this), 500); 61 setTimeout(this._continueSending.bind(this), 500);
60 62
61 // this.disconnect() 63 // this.disconnect()
62 return; 64 return;
63 } 65 }