Mercurial > code > home > repos > streamed-graph
annotate src/SourceGraph.ts @ 130:73a70d00fb74
dep upgrades; working on build+release setup
author | drewp@bigasterisk.com |
---|---|
date | Sat, 06 May 2023 13:11:19 -0700 |
parents | 5a1a79f54779 |
children | a8939c717015 |
rev | line source |
---|---|
128 | 1 import { JsonLdParser } from "jsonld-streaming-parser"; |
2 import { Parser, Store } from "n3"; | |
3 import { SubEvent } from "sub-events"; | |
4 import { Patch } from "./Patch"; | |
5 | |
6 // Possibly-changing graph for one source. Maintains its own dataset. | |
7 // Read-only. | |
8 // | |
9 // Rename to RemoteGraph? RemoteStore? SyncedStore? PatchableGraph? | |
10 export class SourceGraph { | |
11 store: Store; // const; do not rebuild | |
12 isCurrent: boolean = false; | |
13 sourceGraphChanged: SubEvent<Patch> = new SubEvent(); | |
14 constructor(public url: string /* immutable */) { | |
15 this.store = new Store(); | |
16 } | |
17 | |
18 dispose() {} | |
19 | |
20 // Call this first, after you've subscribed to `sourceGraphChanged`. This call may | |
21 // synchronously fire change events. | |
22 // | |
23 async reloadRdf() { | |
24 const resp = await fetch(this.url); | |
25 const ctype = resp.headers.get("content-type"); | |
26 if (ctype?.startsWith("text/event-stream")) { | |
27 await this.reloadEventStream(); | |
28 } else { | |
29 await this.reloadSimpleFile(resp); | |
30 } | |
31 } | |
32 | |
33 private async reloadEventStream(): Promise<void> { | |
34 return new Promise((res, rej) => { | |
35 // todo deal with reconnects | |
36 const ev = new EventSource(this.url); | |
37 let firstPatch = true; | |
38 // clear store here? | |
39 | |
40 // maybe the event messages should be 'add' and 'del', | |
41 // for less parsing and clearer order of ops. | |
42 ev.addEventListener("patch", async (ev) => { | |
130
73a70d00fb74
dep upgrades; working on build+release setup
drewp@bigasterisk.com
parents:
128
diff
changeset
|
43 const patchMsg = JSON.parse((ev as any).data); |
128 | 44 |
45 const p = new Patch(); | |
46 | |
47 const parser = new JsonLdParser(); | |
48 parser.write(JSON.stringify(patchMsg.patch.adds)); | |
49 parser.end(); | |
50 | |
51 await p.streamImport(parser); | |
52 this.isCurrent = true; | |
53 | |
54 p.applyToStore(this.store); | |
55 console.log("patchlife0: eventsream store changed"); | |
56 this.sourceGraphChanged.emit(p); | |
57 if (firstPatch) { | |
58 firstPatch = false; | |
59 res(); | |
60 } | |
61 }); | |
62 }); | |
63 } | |
64 | |
65 private async reloadSimpleFile(resp: Response) { | |
66 const bodyText = await resp.text(); | |
67 const parser = new Parser({ format: "application/trig" }); | |
68 await new Promise<void>((res, rej) => { | |
69 parser.parse(bodyText, (error, quad, prefixes) => { | |
70 if (error) { | |
71 console.log("parse ~ error:", error); | |
72 rej(error); | |
73 return; | |
74 } | |
75 if (quad) { | |
76 this.store.addQuad(quad); // prob want to do this as one or a small number of patches | |
77 } else { | |
78 res(); | |
79 } | |
80 }); | |
81 }); | |
82 this.isCurrent = true; | |
83 // this may have to fire per addQuad call for correctness, or else we batch the adds where no readers can see them in advance. | |
84 console.log("patchlife0: simple file store changed"); | |
85 this.sourceGraphChanged.emit(new Patch()); | |
86 } | |
87 | |
88 quadCount(): number { | |
89 return this.store.countQuads(null, null, null, null); | |
90 } | |
91 } |