Mercurial > code > home > repos > streamed-graph
comparison src/SourceGraph.ts @ 128:5a1a79f54779
big rewrite
author | drewp@bigasterisk.com |
---|---|
date | Fri, 05 May 2023 21:26:36 -0700 |
parents | |
children | 73a70d00fb74 |
comparison
equal
deleted
inserted
replaced
127:d2580faef057 | 128:5a1a79f54779 |
---|---|
1 import { JsonLdParser } from "jsonld-streaming-parser"; | |
2 import { Parser, Store } from "n3"; | |
3 import { SubEvent } from "sub-events"; | |
4 import { Patch } from "./Patch"; | |
5 | |
6 // Possibly-changing graph for one source. Maintains its own dataset. | |
7 // Read-only. | |
8 // | |
9 // Rename to RemoteGraph? RemoteStore? SyncedStore? PatchableGraph? | |
10 export class SourceGraph { | |
11 store: Store; // const; do not rebuild | |
12 isCurrent: boolean = false; | |
13 sourceGraphChanged: SubEvent<Patch> = new SubEvent(); | |
14 constructor(public url: string /* immutable */) { | |
15 this.store = new Store(); | |
16 } | |
17 | |
18 dispose() {} | |
19 | |
20 // Call this first, after you've subscribed to `sourceGraphChanged`. This call may | |
21 // synchronously fire change events. | |
22 // | |
23 async reloadRdf() { | |
24 const resp = await fetch(this.url); | |
25 const ctype = resp.headers.get("content-type"); | |
26 if (ctype?.startsWith("text/event-stream")) { | |
27 await this.reloadEventStream(); | |
28 } else { | |
29 await this.reloadSimpleFile(resp); | |
30 } | |
31 } | |
32 | |
33 private async reloadEventStream(): Promise<void> { | |
34 return new Promise((res, rej) => { | |
35 // todo deal with reconnects | |
36 const ev = new EventSource(this.url); | |
37 let firstPatch = true; | |
38 // clear store here? | |
39 | |
40 // maybe the event messages should be 'add' and 'del', | |
41 // for less parsing and clearer order of ops. | |
42 ev.addEventListener("patch", async (ev) => { | |
43 const patchMsg = JSON.parse(ev.data); | |
44 | |
45 const p = new Patch(); | |
46 | |
47 const parser = new JsonLdParser(); | |
48 parser.write(JSON.stringify(patchMsg.patch.adds)); | |
49 parser.end(); | |
50 | |
51 await p.streamImport(parser); | |
52 this.isCurrent = true; | |
53 | |
54 p.applyToStore(this.store); | |
55 console.log("patchlife0: eventsream store changed"); | |
56 this.sourceGraphChanged.emit(p); | |
57 if (firstPatch) { | |
58 firstPatch = false; | |
59 res(); | |
60 } | |
61 }); | |
62 }); | |
63 } | |
64 | |
65 private async reloadSimpleFile(resp: Response) { | |
66 const bodyText = await resp.text(); | |
67 const parser = new Parser({ format: "application/trig" }); | |
68 await new Promise<void>((res, rej) => { | |
69 parser.parse(bodyText, (error, quad, prefixes) => { | |
70 if (error) { | |
71 console.log("parse ~ error:", error); | |
72 rej(error); | |
73 return; | |
74 } | |
75 if (quad) { | |
76 this.store.addQuad(quad); // prob want to do this as one or a small number of patches | |
77 } else { | |
78 res(); | |
79 } | |
80 }); | |
81 }); | |
82 this.isCurrent = true; | |
83 // this may have to fire per addQuad call for correctness, or else we batch the adds where no readers can see them in advance. | |
84 console.log("patchlife0: simple file store changed"); | |
85 this.sourceGraphChanged.emit(new Patch()); | |
86 } | |
87 | |
88 quadCount(): number { | |
89 return this.store.countQuads(null, null, null, null); | |
90 } | |
91 } |