Changeset - cdfd2901918a
[Not reviewed]
default
0 4 0
drewp@bigasterisk.com - 20 months ago 2023-06-01 21:21:20
drewp@bigasterisk.com
logging
4 files changed with 9 insertions and 6 deletions:
0 comments (0 inline, 0 general)
light9/web/RdfDbChannel.ts
Show inline comments
 
@@ -36,51 +36,51 @@ export class RdfDbChannel {
 
  private connectionId: string = "none"; // server's name for us
 
  private reconnectTimer?: NodeJS.Timeout = undefined;
 
  private messagesReceived = 0; // (non-ping messages)
 
  private messagesSent = 0;
 

	
 
  newConnection: SubEvent<void> = new SubEvent();
 
  serverMessage: SubEvent<{ evType: string; body: SyncgraphPatchMessage }> = new SubEvent();
 
  statusDisplay: SubEvent<string> = new SubEvent();
 

	
 
  constructor(public patchSenderUrl: string) {
 
    this.openConnection();
 
  }
 
  sendMessage(body: string): boolean {
 
    // one try, best effort, true if we think it worked
 
    if (!this.ws || this.ws.readyState !== this.ws.OPEN) {
 
      return false;
 
    }
 
    log("send patch to server, " + body.length + " bytes");
 
    this.ws.send(body);
 
    this.messagesSent++;
 
    this.updateStatus();
 
    return true;
 
  }
 

	
 
  disconnect() {
 
  disconnect(why:string) {
 
    // will be followed by an autoconnect
 
    log("disconnect requested");
 
    log("disconnect requested:", why);
 
    if (this.ws !== undefined) {
 
      const closeHandler = this.ws.onclose?.bind(this.ws);
 
      if (!closeHandler) {
 
        throw new Error();
 
      }
 
      closeHandler(new CloseEvent("forced"));
 
    }
 
  }
 

	
 
  private openConnection() {
 
    const wsOrWss = window.location.protocol.replace("http", "ws");
 
    const fullUrl = wsOrWss + "//" + window.location.host + this.patchSenderUrl;
 
    if (this.ws !== undefined) {
 
      this.ws.close();
 
    }
 
    this.ws = new WebSocket(fullUrl);
 
    this.ws.onopen = this.onWsOpen.bind(this, this.ws);
 
    this.ws.onerror = this.onWsError.bind(this);
 
    this.ws.onclose = this.onWsClose.bind(this);
 
    this.ws.onmessage = this.onWsMessage.bind(this);
 
  }
 

	
 
  private onWsOpen(ws: WebSocket) {
 
    log("new connection to", this.patchSenderUrl);
 
@@ -102,49 +102,49 @@ export class RdfDbChannel {
 
    if (this.pinger) {
 
      this.pinger.pong();
 
      this.updateStatus();
 
    }
 
  }
 

	
 
  private onJson(msg: string) {
 
    const input = JSON.parse(msg);
 
    if (input.connectedAs) {
 
      this.connectionId = input.connectedAs;
 
    } else {
 
      this.onPatch(input as SyncgraphPatchMessage);
 
    }
 
  }
 

	
 
  private onPatch(input: SyncgraphPatchMessage) {
 
    log(`patch msg from server`);
 
    this.serverMessage.emit({ evType: "patch", body: input });
 
    this.messagesReceived++;
 
    this.updateStatus();
 
  }
 

	
 
  private onWsError(e: Event) {
 
    log("ws error", e);
 
    this.disconnect();
 
    this.disconnect("ws error");
 
    this.updateStatus();
 
  }
 

	
 
  private onWsClose(ev: CloseEvent) {
 
    log("ws close");
 
    this.updateStatus();
 
    if (this.reconnectTimer !== undefined) {
 
      clearTimeout(this.reconnectTimer);
 
    }
 
    this.reconnectTimer = setTimeout(this.openConnection.bind(this), 1000);
 
  }
 

	
 
  private updateStatus() {
 
    const conn = (() => {
 
      if (this.ws === undefined) {
 
        return "no";
 
      } else {
 
        switch (this.ws.readyState) {
 
          case this.ws.CONNECTING:
 
            return "connecting";
 
          case this.ws.OPEN:
 
            return `open as ${this.connectionId}`;
 
          case this.ws.CLOSING:
 
            return "closing";
light9/web/SyncedGraph.ts
Show inline comments
 
@@ -17,49 +17,49 @@ export class SyncedGraph {
 
  private cachedFloatValues: Map<string, number> = new Map();
 
  private cachedUriValues: Map<string, N3.NamedNode> = new Map();
 
  private prefixFuncs: (prefix: string) => N3.PrefixedToIri;
 
  private serial: any;
 
  private nextNumber: any;
 
  // Main graph object for a browser to use. Consider using RdfdbSyncedGraph element to create & own
 
  // one of these. Syncs both ways with rdfdb. Meant to hide the choice of RDF lib, so we can change it
 
  // later.
 
  //
 
  // Note that _applyPatch is the only method to write to the graph, so
 
  // it can fire subscriptions.
 

	
 
  constructor(
 
    // The /syncedGraph path of an rdfdb server.
 
    patchSenderUrl: string,
 
    // prefixes can be used in Uri(curie) calls. This mapping may grow during loadTrig calls.
 
    public prefixes: Map<string, string>,
 
    private setStatus: (status: string) => void
 
  ) {
 
    this.prefixFuncs = this.rebuildPrefixFuncs(prefixes);
 
    this.graph = new N3.Store();
 
    this.autoDeps = new AutoDependencies(this);
 
    this.autoDeps.graphError.subscribe((e) => {
 
      log("graph learned of error - reconnecting", e);
 
      this.client.disconnect();
 
      this.client.disconnect("graph error");
 
    });
 
    this.clearGraph();
 

	
 
    this.client = new RdfDbClient(patchSenderUrl, this._clearGraphOnNewConnection.bind(this), this._applyPatch.bind(this), this.setStatus);
 
  }
 

	
 
  clearGraph() {
 
    // must not try send a patch to the server!
 
    // just deletes the statements; watchers are unaffected.
 
    this.cachedFloatValues = new Map(); // s + '|' + p -> number
 
    this.cachedUriValues = new Map(); // s + '|' + p -> Uri
 

	
 
    const p = patchToDeleteEntireGraph(this.graph);
 
    if (!p.isEmpty()) {
 
      this._applyPatch(p);
 
    }
 
    // if we had a Store already, this lets N3.Store free all its indices/etc
 
    this.graph = new N3.Store();
 
    this.rebuildPrefixFuncs(this.prefixes);
 
  }
 

	
 
  _clearGraphOnNewConnection() {
 
    // must not try send a patch to the server
 

	
light9/web/patch.ts
Show inline comments
 
@@ -56,48 +56,51 @@ export class Patch {
 

	
 
  isEmpty() {
 
    return Immutable.is(this.dels, this.adds);
 
  }
 

	
 
  applyToGraph(g: N3.Store) {
 
    for (let quad of this.dels) {
 
      g.removeQuad(quad);
 
    }
 
    for (let quad of this.adds) {
 
      g.addQuad(quad);
 
    }
 
  }
 

	
 
  update(other: Patch): Patch {
 
    // this is approx, since it doesnt handle cancelling existing quads.
 
    return new Patch(this.dels.union(other.dels), this.adds.union(other.adds));
 
  }
 

	
 
  summary(): string {
 
    return "-" + this.dels.size + " +" + this.adds.size;
 
  }
 

	
 
  dump(): string {
 
    if (this.dels.size + this.adds.size > 20) {
 
      return this.summary();
 
    }
 
    const lines: string[] = [];
 
    const s = (term: N3.Term): string => {
 
      if (term.termType == "Literal") return term.value;
 
      if (term.termType == "NamedNode")
 
        return term.value
 
          .replace("http://light9.bigasterisk.com/effect/", "effect:")
 
          .replace("http://light9.bigasterisk.com/", ":")
 
          .replace("http://www.w3.org/2000/01/rdf-schema#", "rdfs:")
 
          .replace("http://www.w3.org/1999/02/22-rdf-syntax-ns#", "rdf:");
 
      if (term.termType == "BlankNode") return "_:" + term.value;
 
      return term.id;
 
    };
 
    const delPrefix = "- ",
 
      addPrefix = "\u200B+ "; // dels to sort before adds
 
    this.dels.forEach((d) => lines.push(delPrefix + s(d.subject) + " " + s(d.predicate) + " " + s(d.object)));
 
    this.adds.forEach((d) => lines.push(addPrefix + s(d.subject) + " " + s(d.predicate) + " " + s(d.object)));
 
    lines.sort();
 
    return lines.join("\n") + "\n" + (this.isEmpty() ? "(empty)" : "(nonempty)");
 
  }
 

	
 
  async toJsonPatch(): Promise<string> {
 
    return new Promise((res, rej) => {
 
      const out: SyncgraphPatchMessage = { patch: { adds: "", deletes: "" } };
 

	
light9/web/rdfdbclient.ts
Show inline comments
 
@@ -23,44 +23,44 @@ export class RdfDbClient {
 
    this.channel = new RdfDbChannel(patchSenderUrl);
 
    this.channel.statusDisplay.subscribe((st: string) => {
 
      setStatus(st + `; ${this._patchesToSend.length} pending `);
 
    });
 
    this.channel.newConnection.subscribe(() => {
 
      this.clearGraphOnNewConnection();
 
    });
 
    this.channel.serverMessage.subscribe((m) => {
 
      parseJsonPatch(m.body, (p: Patch) => {
 
        log('patch from server:', p.dump())
 
        if (p.isEmpty()) {
 
          return;
 
        }
 
        this.applyPatch(p);
 
      });
 
    });
 
  }
 

	
 
  sendPatch(patch: Patch) {
 
    log("queue patch to server ", patch.summary());
 
    this._patchesToSend.push(patch);
 
    this._continueSending();
 
  }
 

	
 
  disconnect() {
 
    this.channel.disconnect();
 
  disconnect(why:string) {
 
    this.channel.disconnect(why);
 
  }
 

	
 
  async _continueSending() {
 
    // we could call this less often and coalesce patches together to optimize
 
    // the dragging cases. See rdfdb 'compactPatches' and 'processInbox'.
 
    while (this._patchesToSend.length) {
 
      const patch = this._patchesToSend.splice(0, 1)[0];
 
      const json = await patch.toJsonPatch();
 
      const ret = this.channel.sendMessage(json);
 
      if (!ret) {
 
        setTimeout(this._continueSending.bind(this), 500);
 

	
 
        // this.disconnect()
 
        return;
 
      }
 
    }
 
  }
 
}
0 comments (0 inline, 0 general)