comparison src/SourceGraph.ts @ 128:5a1a79f54779

big rewrite
author drewp@bigasterisk.com
date Fri, 05 May 2023 21:26:36 -0700
parents
children 73a70d00fb74
comparison
equal deleted inserted replaced
127:d2580faef057 128:5a1a79f54779
1 import { JsonLdParser } from "jsonld-streaming-parser";
2 import { Parser, Store } from "n3";
3 import { SubEvent } from "sub-events";
4 import { Patch } from "./Patch";
5
6 // Possibly-changing graph for one source. Maintains its own dataset.
7 // Read-only.
8 //
9 // Rename to RemoteGraph? RemoteStore? SyncedStore? PatchableGraph?
10 export class SourceGraph {
11 store: Store; // const; do not rebuild
12 isCurrent: boolean = false;
13 sourceGraphChanged: SubEvent<Patch> = new SubEvent();
14 constructor(public url: string /* immutable */) {
15 this.store = new Store();
16 }
17
18 dispose() {}
19
20 // Call this first, after you've subscribed to `sourceGraphChanged`. This call may
21 // synchronously fire change events.
22 //
23 async reloadRdf() {
24 const resp = await fetch(this.url);
25 const ctype = resp.headers.get("content-type");
26 if (ctype?.startsWith("text/event-stream")) {
27 await this.reloadEventStream();
28 } else {
29 await this.reloadSimpleFile(resp);
30 }
31 }
32
33 private async reloadEventStream(): Promise<void> {
34 return new Promise((res, rej) => {
35 // todo deal with reconnects
36 const ev = new EventSource(this.url);
37 let firstPatch = true;
38 // clear store here?
39
40 // maybe the event messages should be 'add' and 'del',
41 // for less parsing and clearer order of ops.
42 ev.addEventListener("patch", async (ev) => {
43 const patchMsg = JSON.parse(ev.data);
44
45 const p = new Patch();
46
47 const parser = new JsonLdParser();
48 parser.write(JSON.stringify(patchMsg.patch.adds));
49 parser.end();
50
51 await p.streamImport(parser);
52 this.isCurrent = true;
53
54 p.applyToStore(this.store);
55 console.log("patchlife0: eventsream store changed");
56 this.sourceGraphChanged.emit(p);
57 if (firstPatch) {
58 firstPatch = false;
59 res();
60 }
61 });
62 });
63 }
64
65 private async reloadSimpleFile(resp: Response) {
66 const bodyText = await resp.text();
67 const parser = new Parser({ format: "application/trig" });
68 await new Promise<void>((res, rej) => {
69 parser.parse(bodyText, (error, quad, prefixes) => {
70 if (error) {
71 console.log("parse ~ error:", error);
72 rej(error);
73 return;
74 }
75 if (quad) {
76 this.store.addQuad(quad); // prob want to do this as one or a small number of patches
77 } else {
78 res();
79 }
80 });
81 });
82 this.isCurrent = true;
83 // this may have to fire per addQuad call for correctness, or else we batch the adds where no readers can see them in advance.
84 console.log("patchlife0: simple file store changed");
85 this.sourceGraphChanged.emit(new Patch());
86 }
87
88 quadCount(): number {
89 return this.store.countQuads(null, null, null, null);
90 }
91 }