Mercurial > code > home > repos > light9
comparison web/rdfdbclient.ts @ 2421:ac55319a2eac
don't drop patches that arrive before we get WS connected
author | drewp@bigasterisk.com |
---|---|
date | Tue, 21 May 2024 16:10:39 -0700 |
parents | 4556eebe5d73 |
children |
comparison
equal
deleted
inserted
replaced
2420:d5750b2aaa9e | 2421:ac55319a2eac |
---|---|
3 import { RdfDbChannel } from "./RdfDbChannel"; | 3 import { RdfDbChannel } from "./RdfDbChannel"; |
4 const log = debug("rdfdbclient"); | 4 const log = debug("rdfdbclient"); |
5 | 5 |
6 export class RdfDbClient { | 6 export class RdfDbClient { |
7 private channel: RdfDbChannel; | 7 private channel: RdfDbChannel; |
8 _patchesToSend: Patch[]; | 8 private patchesToSend: Patch[]; |
9 // Send and receive patches from rdfdb. Primarily used in SyncedGraph. | 9 // Send and receive patches from rdfdb. Primarily used in SyncedGraph. |
10 // | 10 // |
11 // What this should do, and does not yet, is keep the graph | 11 // What this should do, and does not yet, is keep the graph |
12 // 'coasting' over a reconnect, applying only the diffs from the old | 12 // 'coasting' over a reconnect, applying only the diffs from the old |
13 // contents to the new ones once they're in. Then, remove all the | 13 // contents to the new ones once they're in. Then, remove all the |
17 patchSenderUrl: string, | 17 patchSenderUrl: string, |
18 private clearGraphOnNewConnection: () => void, | 18 private clearGraphOnNewConnection: () => void, |
19 private applyPatch: (p: Patch) => void, | 19 private applyPatch: (p: Patch) => void, |
20 setStatus: (status: string) => void | 20 setStatus: (status: string) => void |
21 ) { | 21 ) { |
22 this._patchesToSend = []; | 22 this.patchesToSend = []; |
23 this.channel = new RdfDbChannel(patchSenderUrl); | 23 this.channel = new RdfDbChannel(patchSenderUrl); |
24 this.channel.statusDisplay.subscribe((st: string) => { | 24 this.channel.statusDisplay.subscribe((st: string) => { |
25 setStatus(st + `; ${this._patchesToSend.length} pending `); | 25 setStatus(st + `; ${this.patchesToSend.length} pending `); |
26 }); | 26 }); |
27 this.channel.newConnection.subscribe(() => { | 27 this.channel.newConnection.subscribe(() => { |
28 this.clearGraphOnNewConnection(); | 28 this.clearGraphOnNewConnection(); |
29 }); | 29 }); |
30 this.channel.serverMessage.subscribe((m) => { | 30 this.channel.serverMessage.subscribe((m) => { |
38 }); | 38 }); |
39 } | 39 } |
40 | 40 |
41 sendPatch(patch: Patch) { | 41 sendPatch(patch: Patch) { |
42 log("queue patch to server ", patch.summary()); | 42 log("queue patch to server ", patch.summary()); |
43 this._patchesToSend.push(patch); | 43 this.patchesToSend.push(patch); |
44 this._continueSending(); | 44 this._continueSending(); |
45 } | 45 } |
46 | 46 |
47 disconnect(why:string) { | 47 disconnect(why:string) { |
48 this.channel.disconnect(why); | 48 this.channel.disconnect(why); |
49 } | 49 } |
50 | 50 |
51 async _continueSending() { | 51 async _continueSending() { |
52 // we could call this less often and coalesce patches together to optimize | 52 // we could call this less often and coalesce patches together to optimize |
53 // the dragging cases. See rdfdb 'compactPatches' and 'processInbox'. | 53 // the dragging cases. See rdfdb 'compactPatches' and 'processInbox'. |
54 while (this._patchesToSend.length) { | 54 while (this.patchesToSend.length) { |
55 const patch = this._patchesToSend.splice(0, 1)[0]; | 55 const patch = this.patchesToSend.splice(0, 1)[0]; |
56 const json = await patch.toJsonPatch(); | 56 const json = await patch.toJsonPatch(); |
57 const ret = this.channel.sendMessage(json); | 57 const ret = this.channel.sendMessage(json); |
58 if (!ret) { | 58 if (!ret) { |
59 log('sendMessage failed- retrying') | |
60 this.patchesToSend.unshift(patch); | |
59 setTimeout(this._continueSending.bind(this), 500); | 61 setTimeout(this._continueSending.bind(this), 500); |
60 | 62 |
61 // this.disconnect() | 63 // this.disconnect() |
62 return; | 64 return; |
63 } | 65 } |