view src/SourceGraph.ts @ 128:5a1a79f54779

big rewrite
author drewp@bigasterisk.com
date Fri, 05 May 2023 21:26:36 -0700
parents
children 73a70d00fb74
line wrap: on
line source

import { JsonLdParser } from "jsonld-streaming-parser";
import { Parser, Store } from "n3";
import { SubEvent } from "sub-events";
import { Patch } from "./Patch";

// Possibly-changing graph for one source. Maintains its own dataset.
// Read-only.
//
// Rename to RemoteGraph? RemoteStore? SyncedStore? PatchableGraph?
export class SourceGraph {
  store: Store; // const; do not rebuild
  isCurrent: boolean = false;
  sourceGraphChanged: SubEvent<Patch> = new SubEvent();
  constructor(public url: string /* immutable */) {
    this.store = new Store();
  }

  dispose() {}

  // Call this first, after you've subscribed to `sourceGraphChanged`. This call may
  // synchronously fire change events.
  //
  async reloadRdf() {
    const resp = await fetch(this.url);
    const ctype = resp.headers.get("content-type");
    if (ctype?.startsWith("text/event-stream")) {
      await this.reloadEventStream();
    } else {
      await this.reloadSimpleFile(resp);
    }
  }

  private async reloadEventStream(): Promise<void> {
    return new Promise((res, rej) => {
      //  todo deal with reconnects
      const ev = new EventSource(this.url);
      let firstPatch = true;
      // clear store here?

      // maybe the event messages should be 'add' and 'del',
      // for less parsing and clearer order of ops.
      ev.addEventListener("patch", async (ev) => {
        const patchMsg = JSON.parse(ev.data);

        const p = new Patch();

        const parser = new JsonLdParser();
        parser.write(JSON.stringify(patchMsg.patch.adds));
        parser.end();

        await p.streamImport(parser);
        this.isCurrent = true;

        p.applyToStore(this.store);
        console.log("patchlife0: eventsream store changed");
        this.sourceGraphChanged.emit(p);
        if (firstPatch) {
          firstPatch = false;
          res();
        }
      });
    });
  }

  private async reloadSimpleFile(resp: Response) {
    const bodyText = await resp.text();
    const parser = new Parser({ format: "application/trig" });
    await new Promise<void>((res, rej) => {
      parser.parse(bodyText, (error, quad, prefixes) => {
        if (error) {
          console.log("parse ~ error:", error);
          rej(error);
          return;
        }
        if (quad) {
          this.store.addQuad(quad); // prob want to do this as one or a small number of patches
        } else {
          res();
        }
      });
    });
    this.isCurrent = true;
    // this may have to fire per addQuad call for correctness, or else we batch the adds where no readers can see them in advance.
    console.log("patchlife0: simple file store changed");
    this.sourceGraphChanged.emit(new Patch());
  }

  quadCount(): number {
    return this.store.countQuads(null, null, null, null);
  }
}