comparison src/SourceGraph.ts @ 139:cf642d395be4

new simpler Patch class; fancier 'hide' view config support
author drewp@bigasterisk.com
date Mon, 08 May 2023 13:05:20 -0700
parents a8939c717015
children
comparison
equal deleted inserted replaced
138:ea0b4e46de2b 139:cf642d395be4
1 import { JsonLdParser } from "jsonld-streaming-parser"; 1 import { JsonLdParser } from "jsonld-streaming-parser";
2 import { Parser, Store } from "n3"; 2 import { Parser, Store } from "n3";
3 import { SubEvent } from "sub-events"; 3 import { SubEvent } from "sub-events";
4 import { Patch } from "./Patch"; 4 import { Patch, PatchDirection } from "./Patch";
5
6 type JsonLdData = Object;
7
8 interface CombinedPatchBody {
9 adds?: JsonLdData;
10 deletes?: JsonLdData;
11 }
12 interface CombinedPatchEvent {
13 patch: CombinedPatchBody;
14 }
5 15
6 // Possibly-changing graph for one source. Maintains its own dataset. 16 // Possibly-changing graph for one source. Maintains its own dataset.
7 // Read-only. 17 // Read-only.
8 // 18 //
9 // Rename to RemoteGraph? RemoteStore? SyncedStore? PatchableGraph? 19 // Rename to RemoteGraph? RemoteStore? SyncedStore? PatchableGraph?
20 // Call this first, after you've subscribed to `sourceGraphChanged`. This call may 30 // Call this first, after you've subscribed to `sourceGraphChanged`. This call may
21 // synchronously fire change events. 31 // synchronously fire change events.
22 // 32 //
23 async reloadRdf() { 33 async reloadRdf() {
24 const resp = await fetch(this.url); 34 const resp = await fetch(this.url);
35 this.clearStore();
25 const ctype = resp.headers.get("content-type"); 36 const ctype = resp.headers.get("content-type");
26 if (ctype?.startsWith("text/event-stream")) { 37 if (ctype?.startsWith("text/event-stream")) {
27 await this.reloadEventStream(); 38 await this.reloadEventStream();
28 } else { 39 } else {
29 await this.reloadSimpleFile(resp); 40 await this.reloadSimpleFile(resp);
30 } 41 }
31 } 42 }
32 43
44 private clearStore() {
45 this.store.removeMatches(null, null, null, null);
46 }
47
48 async makePatchFromParsed(
49 dir: PatchDirection,
50 jsonLdObj: Object | undefined
51 ): Promise<Patch> {
52 const p = new Patch(dir);
53 if (jsonLdObj === undefined) {
54 return p;
55 }
56 const parser = new JsonLdParser();
57
58 parser.write(JSON.stringify(jsonLdObj));
59 parser.end();
60
61 await p.streamImport(parser);
62 return p;
63 }
64
65 private applyPatch(p: Patch) {
66 if (p.isEmpty()){
67 return;
68 }
69 p.applyToStore(this.store);
70 if (this.sourceGraphChanged.getStat().unnamed == 0) {
71 console.warn("no listeners to this sourceGraphChanged event");
72 }
73 this.sourceGraphChanged.emit(p);
74 }
75
33 private async reloadEventStream(): Promise<void> { 76 private async reloadEventStream(): Promise<void> {
34 return new Promise((res, rej) => { 77 return new Promise((res, rej) => {
35 // todo deal with reconnects
36 const ev = new EventSource(this.url); 78 const ev = new EventSource(this.url);
37 let firstPatch = true;
38 // clear store here?
39 79
40 // maybe the event messages should be 'add' and 'del', 80 // old-style fullGraph. I now think it would be better to send plain
41 // for less parsing and clearer order of ops. 81 // adds and for this SourceGraph to emptyGraph when there's a new connection.
42 ev.addEventListener("patch", async (ev) => { 82 ev.addEventListener("fullGraph", async (ev) => {
43 // this is reentrant- ok? 83 this.clearStore();
44 84 const p = new Patch(PatchDirection.ADD);
45 const patchMsg = JSON.parse((ev as any).data);
46
47 const p = new Patch();
48
49 const parser = new JsonLdParser(); 85 const parser = new JsonLdParser();
50 parser.write(JSON.stringify(patchMsg.patch.adds)); 86 parser.write(ev.data);
51 parser.end(); 87 parser.end();
52 88
53 await p.streamImport(parser); 89 await p.streamImport(parser);
90 p.applyToStore(this.store);
54 this.isCurrent = true; 91 this.isCurrent = true;
92 this.sourceGraphChanged.emit(p);
93 res();
94 });
55 95
56 p.applyToStore(this.store); 96 // this is for old-style dels-then-adds patches
57 console.log("patchlife0: eventsream store changed"); 97 ev.addEventListener("patch", async (ev) => {
58 // firing before there are listeners 98 // this is reentrant- patches might be able to get applied out of order!
59 this.sourceGraphChanged.emit(p); 99
60 if (firstPatch) { 100 const patchMsg: CombinedPatchEvent = JSON.parse((ev as any).data);
61 firstPatch = false; 101
62 res(); 102 const p0 = await this.makePatchFromParsed(
63 } 103 PatchDirection.DEL,
104 patchMsg.patch.deletes
105 );
106 const p1 = await this.makePatchFromParsed(
107 PatchDirection.ADD,
108 patchMsg.patch.adds
109 );
110
111 this.applyPatch(p0);
112 this.applyPatch(p1);
113
114 this.isCurrent = true;
64 }); 115 });
116 // here, add listeners for new-style add/del patches
65 }); 117 });
66 } 118 }
67 119
68 private async reloadSimpleFile(resp: Response) { 120 private async reloadSimpleFile(resp: Response) {
69 const bodyText = await resp.text(); 121 const bodyText = await resp.text();
122 const p = new Patch(PatchDirection.ADD);
70 const parser = new Parser({ format: "application/trig" }); 123 const parser = new Parser({ format: "application/trig" });
71 await new Promise<void>((res, rej) => { 124 await new Promise<void>((res, rej) => {
72 parser.parse(bodyText, (error, quad, prefixes) => { 125 parser.parse(bodyText, (error, quad, prefixes) => {
73 if (error) { 126 if (error) {
74 console.log("parse ~ error:", error); 127 console.log("parse ~ error:", error);
75 rej(error); 128 rej(error);
76 return; 129 return;
77 } 130 }
78 if (quad) { 131 if (quad) {
79 this.store.addQuad(quad); // prob want to do this as one or a small number of patches 132 p.quads.push(quad);
80 } else { 133 } else {
81 res(); 134 res();
82 } 135 }
83 }); 136 });
84 }); 137 });
138 this.applyPatch(p);
85 this.isCurrent = true; 139 this.isCurrent = true;
86 // this may have to fire per addQuad call for correctness, or else we batch the adds where no readers can see them in advance.
87 console.log("patchlife0: simple file store changed");
88 this.sourceGraphChanged.emit(new Patch());
89 } 140 }
90 141
91 quadCount(): number { 142 quadCount(): number {
92 return this.store.countQuads(null, null, null, null); 143 return this.store.countQuads(null, null, null, null);
93 } 144 }