Mercurial > code > home > repos > streamed-graph
annotate src/SourceGraph.ts @ 137:a8939c717015
still trying to get build right, but there are other update bugs too
author | drewp@bigasterisk.com |
---|---|
date | Sat, 06 May 2023 15:34:40 -0700 |
parents | 73a70d00fb74 |
children | cf642d395be4 |
rev | line source |
---|---|
128 | 1 import { JsonLdParser } from "jsonld-streaming-parser"; |
2 import { Parser, Store } from "n3"; | |
3 import { SubEvent } from "sub-events"; | |
4 import { Patch } from "./Patch"; | |
5 | |
6 // Possibly-changing graph for one source. Maintains its own dataset. | |
7 // Read-only. | |
8 // | |
9 // Rename to RemoteGraph? RemoteStore? SyncedStore? PatchableGraph? | |
10 export class SourceGraph { | |
11 store: Store; // const; do not rebuild | |
12 isCurrent: boolean = false; | |
13 sourceGraphChanged: SubEvent<Patch> = new SubEvent(); | |
14 constructor(public url: string /* immutable */) { | |
15 this.store = new Store(); | |
16 } | |
17 | |
18 dispose() {} | |
19 | |
20 // Call this first, after you've subscribed to `sourceGraphChanged`. This call may | |
21 // synchronously fire change events. | |
22 // | |
23 async reloadRdf() { | |
24 const resp = await fetch(this.url); | |
25 const ctype = resp.headers.get("content-type"); | |
26 if (ctype?.startsWith("text/event-stream")) { | |
27 await this.reloadEventStream(); | |
28 } else { | |
29 await this.reloadSimpleFile(resp); | |
30 } | |
31 } | |
32 | |
33 private async reloadEventStream(): Promise<void> { | |
34 return new Promise((res, rej) => { | |
35 // todo deal with reconnects | |
36 const ev = new EventSource(this.url); | |
37 let firstPatch = true; | |
38 // clear store here? | |
39 | |
40 // maybe the event messages should be 'add' and 'del', | |
41 // for less parsing and clearer order of ops. | |
42 ev.addEventListener("patch", async (ev) => { | |
137
a8939c717015
still trying to get build right, but there are other update bugs too
drewp@bigasterisk.com
parents:
130
diff
changeset
|
43 // this is reentrant- ok? |
a8939c717015
still trying to get build right, but there are other update bugs too
drewp@bigasterisk.com
parents:
130
diff
changeset
|
44 |
130
73a70d00fb74
dep upgrades; working on build+release setup
drewp@bigasterisk.com
parents:
128
diff
changeset
|
45 const patchMsg = JSON.parse((ev as any).data); |
128 | 46 |
47 const p = new Patch(); | |
48 | |
49 const parser = new JsonLdParser(); | |
50 parser.write(JSON.stringify(patchMsg.patch.adds)); | |
51 parser.end(); | |
52 | |
53 await p.streamImport(parser); | |
54 this.isCurrent = true; | |
55 | |
56 p.applyToStore(this.store); | |
57 console.log("patchlife0: eventsream store changed"); | |
137
a8939c717015
still trying to get build right, but there are other update bugs too
drewp@bigasterisk.com
parents:
130
diff
changeset
|
58 // firing before there are listeners |
128 | 59 this.sourceGraphChanged.emit(p); |
60 if (firstPatch) { | |
61 firstPatch = false; | |
62 res(); | |
63 } | |
64 }); | |
65 }); | |
66 } | |
67 | |
68 private async reloadSimpleFile(resp: Response) { | |
69 const bodyText = await resp.text(); | |
70 const parser = new Parser({ format: "application/trig" }); | |
71 await new Promise<void>((res, rej) => { | |
72 parser.parse(bodyText, (error, quad, prefixes) => { | |
73 if (error) { | |
74 console.log("parse ~ error:", error); | |
75 rej(error); | |
76 return; | |
77 } | |
78 if (quad) { | |
79 this.store.addQuad(quad); // prob want to do this as one or a small number of patches | |
80 } else { | |
81 res(); | |
82 } | |
83 }); | |
84 }); | |
85 this.isCurrent = true; | |
86 // this may have to fire per addQuad call for correctness, or else we batch the adds where no readers can see them in advance. | |
87 console.log("patchlife0: simple file store changed"); | |
88 this.sourceGraphChanged.emit(new Patch()); | |
89 } | |
90 | |
91 quadCount(): number { | |
92 return this.store.countQuads(null, null, null, null); | |
93 } | |
94 } |