Mercurial > code > home > repos > streamed-graph
comparison src/SourceGraph.ts @ 139:cf642d395be4
new simpler Patch class; fancier 'hide' view config support
author | drewp@bigasterisk.com |
---|---|
date | Mon, 08 May 2023 13:05:20 -0700 |
parents | a8939c717015 |
children |
comparison
equal
deleted
inserted
replaced
138:ea0b4e46de2b | 139:cf642d395be4 |
---|---|
1 import { JsonLdParser } from "jsonld-streaming-parser"; | 1 import { JsonLdParser } from "jsonld-streaming-parser"; |
2 import { Parser, Store } from "n3"; | 2 import { Parser, Store } from "n3"; |
3 import { SubEvent } from "sub-events"; | 3 import { SubEvent } from "sub-events"; |
4 import { Patch } from "./Patch"; | 4 import { Patch, PatchDirection } from "./Patch"; |
5 | |
6 type JsonLdData = Object; | |
7 | |
8 interface CombinedPatchBody { | |
9 adds?: JsonLdData; | |
10 deletes?: JsonLdData; | |
11 } | |
12 interface CombinedPatchEvent { | |
13 patch: CombinedPatchBody; | |
14 } | |
5 | 15 |
6 // Possibly-changing graph for one source. Maintains its own dataset. | 16 // Possibly-changing graph for one source. Maintains its own dataset. |
7 // Read-only. | 17 // Read-only. |
8 // | 18 // |
9 // Rename to RemoteGraph? RemoteStore? SyncedStore? PatchableGraph? | 19 // Rename to RemoteGraph? RemoteStore? SyncedStore? PatchableGraph? |
20 // Call this first, after you've subscribed to `sourceGraphChanged`. This call may | 30 // Call this first, after you've subscribed to `sourceGraphChanged`. This call may |
21 // synchronously fire change events. | 31 // synchronously fire change events. |
22 // | 32 // |
23 async reloadRdf() { | 33 async reloadRdf() { |
24 const resp = await fetch(this.url); | 34 const resp = await fetch(this.url); |
35 this.clearStore(); | |
25 const ctype = resp.headers.get("content-type"); | 36 const ctype = resp.headers.get("content-type"); |
26 if (ctype?.startsWith("text/event-stream")) { | 37 if (ctype?.startsWith("text/event-stream")) { |
27 await this.reloadEventStream(); | 38 await this.reloadEventStream(); |
28 } else { | 39 } else { |
29 await this.reloadSimpleFile(resp); | 40 await this.reloadSimpleFile(resp); |
30 } | 41 } |
31 } | 42 } |
32 | 43 |
44 private clearStore() { | |
45 this.store.removeMatches(null, null, null, null); | |
46 } | |
47 | |
48 async makePatchFromParsed( | |
49 dir: PatchDirection, | |
50 jsonLdObj: Object | undefined | |
51 ): Promise<Patch> { | |
52 const p = new Patch(dir); | |
53 if (jsonLdObj === undefined) { | |
54 return p; | |
55 } | |
56 const parser = new JsonLdParser(); | |
57 | |
58 parser.write(JSON.stringify(jsonLdObj)); | |
59 parser.end(); | |
60 | |
61 await p.streamImport(parser); | |
62 return p; | |
63 } | |
64 | |
65 private applyPatch(p: Patch) { | |
66 if (p.isEmpty()){ | |
67 return; | |
68 } | |
69 p.applyToStore(this.store); | |
70 if (this.sourceGraphChanged.getStat().unnamed == 0) { | |
71 console.warn("no listeners to this sourceGraphChanged event"); | |
72 } | |
73 this.sourceGraphChanged.emit(p); | |
74 } | |
75 | |
33 private async reloadEventStream(): Promise<void> { | 76 private async reloadEventStream(): Promise<void> { |
34 return new Promise((res, rej) => { | 77 return new Promise((res, rej) => { |
35 // todo deal with reconnects | |
36 const ev = new EventSource(this.url); | 78 const ev = new EventSource(this.url); |
37 let firstPatch = true; | |
38 // clear store here? | |
39 | 79 |
40 // maybe the event messages should be 'add' and 'del', | 80 // old-style fullGraph. I now think it would be better to send plain |
41 // for less parsing and clearer order of ops. | 81 // adds and for this SourceGraph to emptyGraph when there's a new connection. |
42 ev.addEventListener("patch", async (ev) => { | 82 ev.addEventListener("fullGraph", async (ev) => { |
43 // this is reentrant- ok? | 83 this.clearStore(); |
44 | 84 const p = new Patch(PatchDirection.ADD); |
45 const patchMsg = JSON.parse((ev as any).data); | |
46 | |
47 const p = new Patch(); | |
48 | |
49 const parser = new JsonLdParser(); | 85 const parser = new JsonLdParser(); |
50 parser.write(JSON.stringify(patchMsg.patch.adds)); | 86 parser.write(ev.data); |
51 parser.end(); | 87 parser.end(); |
52 | 88 |
53 await p.streamImport(parser); | 89 await p.streamImport(parser); |
90 p.applyToStore(this.store); | |
54 this.isCurrent = true; | 91 this.isCurrent = true; |
92 this.sourceGraphChanged.emit(p); | |
93 res(); | |
94 }); | |
55 | 95 |
56 p.applyToStore(this.store); | 96 // this is for old-style dels-then-adds patches |
57 console.log("patchlife0: eventsream store changed"); | 97 ev.addEventListener("patch", async (ev) => { |
58 // firing before there are listeners | 98 // this is reentrant- patches might be able to get applied out of order! |
59 this.sourceGraphChanged.emit(p); | 99 |
60 if (firstPatch) { | 100 const patchMsg: CombinedPatchEvent = JSON.parse((ev as any).data); |
61 firstPatch = false; | 101 |
62 res(); | 102 const p0 = await this.makePatchFromParsed( |
63 } | 103 PatchDirection.DEL, |
104 patchMsg.patch.deletes | |
105 ); | |
106 const p1 = await this.makePatchFromParsed( | |
107 PatchDirection.ADD, | |
108 patchMsg.patch.adds | |
109 ); | |
110 | |
111 this.applyPatch(p0); | |
112 this.applyPatch(p1); | |
113 | |
114 this.isCurrent = true; | |
64 }); | 115 }); |
116 // here, add listeners for new-style add/del patches | |
65 }); | 117 }); |
66 } | 118 } |
67 | 119 |
68 private async reloadSimpleFile(resp: Response) { | 120 private async reloadSimpleFile(resp: Response) { |
69 const bodyText = await resp.text(); | 121 const bodyText = await resp.text(); |
122 const p = new Patch(PatchDirection.ADD); | |
70 const parser = new Parser({ format: "application/trig" }); | 123 const parser = new Parser({ format: "application/trig" }); |
71 await new Promise<void>((res, rej) => { | 124 await new Promise<void>((res, rej) => { |
72 parser.parse(bodyText, (error, quad, prefixes) => { | 125 parser.parse(bodyText, (error, quad, prefixes) => { |
73 if (error) { | 126 if (error) { |
74 console.log("parse ~ error:", error); | 127 console.log("parse ~ error:", error); |
75 rej(error); | 128 rej(error); |
76 return; | 129 return; |
77 } | 130 } |
78 if (quad) { | 131 if (quad) { |
79 this.store.addQuad(quad); // prob want to do this as one or a small number of patches | 132 p.quads.push(quad); |
80 } else { | 133 } else { |
81 res(); | 134 res(); |
82 } | 135 } |
83 }); | 136 }); |
84 }); | 137 }); |
138 this.applyPatch(p); | |
85 this.isCurrent = true; | 139 this.isCurrent = true; |
86 // this may have to fire per addQuad call for correctness, or else we batch the adds where no readers can see them in advance. | |
87 console.log("patchlife0: simple file store changed"); | |
88 this.sourceGraphChanged.emit(new Patch()); | |
89 } | 140 } |
90 | 141 |
91 quadCount(): number { | 142 quadCount(): number { |
92 return this.store.countQuads(null, null, null, null); | 143 return this.store.countQuads(null, null, null, null); |
93 } | 144 } |