Mercurial > code > home > repos > streamed-graph
comparison src/layout/StreamedGraphClient.ts @ 107:042bd3361339
renames
author | drewp@bigasterisk.com |
---|---|
date | Sun, 13 Mar 2022 22:02:30 -0700 |
parents | src/layout/streamed_graph_client.ts@2468f2227d22 |
children | 5a1a79f54779 |
comparison
equal
deleted
inserted
replaced
106:2468f2227d22 | 107:042bd3361339 |
---|---|
1 import { eachJsonLdQuad } from "./json_ld_quads"; | |
2 import { Store } from "n3"; | |
3 | |
4 export class StreamedGraphClient { | |
5 // holds a n3 Store, which is synced to a server-side | |
6 // store that sends patches over SSE | |
7 | |
8 onStatus: (msg: string) => void = function (m) {}; | |
9 onGraphChanged: () => void = function () {}; | |
10 store: Store; | |
11 _deletedCount: number = 0; | |
12 events!: EventSource; | |
13 constructor( | |
14 eventsUrl: string, | |
15 onGraphChanged: () => void, | |
16 onStatus: (status: string) => void, | |
17 prefixes: Array<Record<string, string>>, | |
18 staticGraphUrls: Array<string> | |
19 ) { | |
20 console.log("new StreamedGraph", eventsUrl); | |
21 this.onStatus = onStatus; | |
22 this.onGraphChanged = onGraphChanged; | |
23 this.onStatus("startup..."); | |
24 | |
25 this.store = new Store(); | |
26 | |
27 // Object.keys(prefixes).forEach((prefix) => { | |
28 // this.store.setPrefix(prefix, prefixes[prefix]); | |
29 // }); | |
30 | |
31 this.connect(eventsUrl); | |
32 this.reconnectOnWake(); | |
33 | |
34 // staticGraphUrls.forEach((url) => { | |
35 // fetch(url).then((response) => response.text()) | |
36 // .then((body) => { | |
37 // // parse with n3, add to output | |
38 // }); | |
39 // }); | |
40 } | |
41 | |
42 _vacuum() { | |
43 // workaround for the growing _ids map | |
44 this.store = new Store(this.store.getQuads(null, null, null, null)); | |
45 } | |
46 | |
47 reconnectOnWake() { | |
48 // it's not this, which fires on every mouse-in on a browser window, and doesn't seem to work for screen-turned-back-on | |
49 //window.addEventListener('focus', function() { this.connect(eventsUrl); }.bind(this)); | |
50 } | |
51 | |
52 connect(eventsUrl: string) { | |
53 // need to exit here if this obj has been replaced | |
54 | |
55 this.onStatus("start connect..."); | |
56 this.close(); | |
57 if (this.events && this.events.readyState != EventSource.CLOSED) { | |
58 this.onStatus("zombie"); | |
59 throw new Error("zombie eventsource"); | |
60 } | |
61 | |
62 this.events = new EventSource(eventsUrl, { withCredentials: true }); | |
63 | |
64 this.events.addEventListener("error", (ev) => { | |
65 // todo: this is piling up tons of retries and eventually multiple connections | |
66 this.testEventUrl(eventsUrl); | |
67 this.onStatus("connection lost- retrying"); | |
68 setTimeout(() => { | |
69 requestAnimationFrame(() => { | |
70 this.connect(eventsUrl); | |
71 }); | |
72 }, 3000); | |
73 }); | |
74 | |
75 this.events.addEventListener("fullGraph", async (ev) => { | |
76 this.onStatus("sync- full graph update"); | |
77 await this.replaceFullGraph((ev as MessageEvent).data); | |
78 this.onStatus(`synced ${this.store.size}`); | |
79 this.onGraphChanged(); | |
80 }); | |
81 | |
82 this.events.addEventListener("patch", async (ev) => { | |
83 this.onStatus("sync- updating"); | |
84 await this.patchGraph((ev as MessageEvent).data); | |
85 window.setTimeout(() => { | |
86 this.onStatus(`synced ${this.store.size}`); | |
87 }, 60); | |
88 this.onGraphChanged(); | |
89 }); | |
90 this.onStatus("connecting..."); | |
91 } | |
92 | |
93 // these need some locks | |
94 async replaceFullGraph(jsonLdText: string) { | |
95 this.store = new Store(); | |
96 await eachJsonLdQuad( | |
97 JSON.parse(jsonLdText), | |
98 this.store.addQuad.bind(this.store) | |
99 ); | |
100 } | |
101 | |
102 async patchGraph(patchJson: string) { | |
103 var patch = JSON.parse(patchJson).patch; | |
104 | |
105 await eachJsonLdQuad(patch.deletes, (quad) => { | |
106 this.store.removeQuad(quad); | |
107 this._deletedCount++; | |
108 }); | |
109 await eachJsonLdQuad(patch.adds, this.store.addQuad.bind(this.store)); | |
110 | |
111 if (this._deletedCount > 100) { | |
112 this._vacuum(); | |
113 this._deletedCount = 0; | |
114 } | |
115 } | |
116 | |
117 close() { | |
118 if (this.events) { | |
119 this.events.close(); | |
120 } | |
121 } | |
122 | |
123 async testEventUrl(eventsUrl: string): Promise<void> { | |
124 return new Promise<void>((resolve, reject) => { | |
125 this.onStatus("testing connection"); | |
126 fetch(eventsUrl, { | |
127 method: "HEAD", | |
128 credentials: "include", | |
129 }) | |
130 .then((value) => { | |
131 if (value.status == 403) { | |
132 reject(); | |
133 return; | |
134 } | |
135 resolve(); | |
136 }) | |
137 .catch((err) => { | |
138 reject(); | |
139 }); | |
140 }); | |
141 } | |
142 } |