128
|
1 import { JsonLdParser } from "jsonld-streaming-parser";
|
|
2 import { Parser, Store } from "n3";
|
|
3 import { SubEvent } from "sub-events";
|
|
4 import { Patch } from "./Patch";
|
|
5
|
|
6 // Possibly-changing graph for one source. Maintains its own dataset.
|
|
7 // Read-only.
|
|
8 //
|
|
9 // Rename to RemoteGraph? RemoteStore? SyncedStore? PatchableGraph?
|
|
10 export class SourceGraph {
|
|
11 store: Store; // const; do not rebuild
|
|
12 isCurrent: boolean = false;
|
|
13 sourceGraphChanged: SubEvent<Patch> = new SubEvent();
|
|
14 constructor(public url: string /* immutable */) {
|
|
15 this.store = new Store();
|
|
16 }
|
|
17
|
|
18 dispose() {}
|
|
19
|
|
20 // Call this first, after you've subscribed to `sourceGraphChanged`. This call may
|
|
21 // synchronously fire change events.
|
|
22 //
|
|
23 async reloadRdf() {
|
|
24 const resp = await fetch(this.url);
|
|
25 const ctype = resp.headers.get("content-type");
|
|
26 if (ctype?.startsWith("text/event-stream")) {
|
|
27 await this.reloadEventStream();
|
|
28 } else {
|
|
29 await this.reloadSimpleFile(resp);
|
|
30 }
|
|
31 }
|
|
32
|
|
33 private async reloadEventStream(): Promise<void> {
|
|
34 return new Promise((res, rej) => {
|
|
35 // todo deal with reconnects
|
|
36 const ev = new EventSource(this.url);
|
|
37 let firstPatch = true;
|
|
38 // clear store here?
|
|
39
|
|
40 // maybe the event messages should be 'add' and 'del',
|
|
41 // for less parsing and clearer order of ops.
|
|
42 ev.addEventListener("patch", async (ev) => {
|
|
43 const patchMsg = JSON.parse(ev.data);
|
|
44
|
|
45 const p = new Patch();
|
|
46
|
|
47 const parser = new JsonLdParser();
|
|
48 parser.write(JSON.stringify(patchMsg.patch.adds));
|
|
49 parser.end();
|
|
50
|
|
51 await p.streamImport(parser);
|
|
52 this.isCurrent = true;
|
|
53
|
|
54 p.applyToStore(this.store);
|
|
55 console.log("patchlife0: eventsream store changed");
|
|
56 this.sourceGraphChanged.emit(p);
|
|
57 if (firstPatch) {
|
|
58 firstPatch = false;
|
|
59 res();
|
|
60 }
|
|
61 });
|
|
62 });
|
|
63 }
|
|
64
|
|
65 private async reloadSimpleFile(resp: Response) {
|
|
66 const bodyText = await resp.text();
|
|
67 const parser = new Parser({ format: "application/trig" });
|
|
68 await new Promise<void>((res, rej) => {
|
|
69 parser.parse(bodyText, (error, quad, prefixes) => {
|
|
70 if (error) {
|
|
71 console.log("parse ~ error:", error);
|
|
72 rej(error);
|
|
73 return;
|
|
74 }
|
|
75 if (quad) {
|
|
76 this.store.addQuad(quad); // prob want to do this as one or a small number of patches
|
|
77 } else {
|
|
78 res();
|
|
79 }
|
|
80 });
|
|
81 });
|
|
82 this.isCurrent = true;
|
|
83 // this may have to fire per addQuad call for correctness, or else we batch the adds where no readers can see them in advance.
|
|
84 console.log("patchlife0: simple file store changed");
|
|
85 this.sourceGraphChanged.emit(new Patch());
|
|
86 }
|
|
87
|
|
88 quadCount(): number {
|
|
89 return this.store.countQuads(null, null, null, null);
|
|
90 }
|
|
91 }
|