Changeset - ac55319a2eac
[Not reviewed]
default
0 3 0
drewp@bigasterisk.com - 8 months ago 2024-05-21 23:10:39
drewp@bigasterisk.com
don't drop patches that arrive before we get WS connected
3 files changed with 13 insertions and 13 deletions:
0 comments (0 inline, 0 general)
web/RdfDbChannel.ts
Show inline comments
 
import debug from "debug";
 
import { SubEvent } from "sub-events";
 
import { SyncgraphPatchMessage } from "./patch";
 
const log = debug("rdfdbclient");
 

	
 
class ChannelPinger {
 
  private timeoutId?: number;
 
  private lastMs: number = 0;
 
  constructor(private ws: WebSocket) {
 
    this._pingLoop();
 
  }
 
  lastPingMs(): number {
 
    return this.lastMs;
 
  }
 
  pong() {
 
    this.lastMs = Date.now() + this.lastMs;
 
  }
 
  _pingLoop() {
 
    if (this.ws.readyState !== this.ws.OPEN) {
 
      return;
 
    }
 
    this.ws.send("PING");
 
    this.lastMs = -Date.now();
 

	
 
    if (this.timeoutId != null) {
 
      clearTimeout(this.timeoutId);
 
    }
 
    this.timeoutId = (setTimeout(this._pingLoop.bind(this), 10000) as unknown) as number;
 
  }
 
}
 

	
 
export class RdfDbChannel {
 
  // lower level reconnecting websocket -- knows about message types, but not what's inside a patch body
 
  private ws?: WebSocket = undefined;
 
  private pinger?: ChannelPinger;
 
  private connectionId: string = "none"; // server's name for us
 
  private reconnectTimer?: NodeJS.Timeout = undefined;
 
  private messagesReceived = 0; // (non-ping messages)
 
  private messagesSent = 0;
 

	
 
  newConnection: SubEvent<void> = new SubEvent();
 
  serverMessage: SubEvent<{ evType: string; body: SyncgraphPatchMessage }> = new SubEvent();
 
  statusDisplay: SubEvent<string> = new SubEvent();
 

	
 
  constructor(public patchSenderUrl: string) {
 
    this.openConnection();
 
  }
 
  sendMessage(body: string): boolean {
 
    // one try, best effort, true if we think it worked
 
    if (!this.ws || this.ws.readyState !== this.ws.OPEN) {
 
      log("dropping message: " + body);
 
      return false;
 
    }
 
    log("send patch to server, " + body.length + " bytes");
 
    this.ws.send(body);
 
    this.messagesSent++;
 
    this.updateStatus();
 
    return true;
 
  }
 

	
 
  disconnect(why:string) {
 
    // will be followed by an autoconnect
 
    log("disconnect requested:", why);
 
    if (this.ws !== undefined) {
 
      const closeHandler = this.ws.onclose?.bind(this.ws);
 
      if (!closeHandler) {
 
        throw new Error();
 
      }
 
      closeHandler(new CloseEvent("forced"));
 
    }
 
  }
 

	
 
  private openConnection() {
 
    const wsOrWss = window.location.protocol.replace("http", "ws");
 
    const fullUrl = wsOrWss + "//" + window.location.host + this.patchSenderUrl;
 
    if (this.ws !== undefined) {
 
      this.ws.close();
 
    }
 
    this.ws = new WebSocket(fullUrl);
 
    this.ws.onopen = this.onWsOpen.bind(this, this.ws);
 
    this.ws.onerror = this.onWsError.bind(this);
 
    this.ws.onclose = this.onWsClose.bind(this);
 
    this.ws.onmessage = this.onWsMessage.bind(this);
 
  }
 

	
 
  private onWsOpen(ws: WebSocket) {
 
    log("new connection to", this.patchSenderUrl);
 
    this.updateStatus();
 
    this.newConnection.emit();
 
    this.pinger = new ChannelPinger(ws);
 
  }
 

	
 
  private onWsMessage(evt: { data: string }) {
 
    const msg = evt.data;
 
    if (msg === "PONG") {
 
      this.onPong();
 
      return;
 
    }
 
    this.onJson(msg);
 
  }
 

	
 
  private onPong() {
 
    if (this.pinger) {
 
      this.pinger.pong();
 
      this.updateStatus();
 
    }
 
  }
 

	
 
  private onJson(msg: string) {
 
    const input = JSON.parse(msg);
 
    if (input.connectedAs) {
 
      this.connectionId = input.connectedAs;
 
    } else {
 
      this.onPatch(input as SyncgraphPatchMessage);
 
    }
 
  }
 

	
 
  private onPatch(input: SyncgraphPatchMessage) {
 
    log(`patch msg from server`);
 
    this.serverMessage.emit({ evType: "patch", body: input });
 
    this.messagesReceived++;
 
    this.updateStatus();
 
  }
 

	
 
  private onWsError(e: Event) {
 
    log("ws error", e);
 
    this.disconnect("ws error");
 
    this.updateStatus();
 
  }
 

	
 
  private onWsClose(ev: CloseEvent) {
 
    log("ws close");
 
    this.updateStatus();
 
    if (this.reconnectTimer !== undefined) {
 
      clearTimeout(this.reconnectTimer);
 
    }
 
    this.reconnectTimer = setTimeout(this.openConnection.bind(this), 1000);
 
  }
 

	
 
  private updateStatus() {
 
    const conn = (() => {
 
      if (this.ws === undefined) {
 
        return "no";
 
      } else {
 
        switch (this.ws.readyState) {
 
          case this.ws.CONNECTING:
 
            return "connecting";
 
          case this.ws.OPEN:
 
            return `open as ${this.connectionId}`;
 
          case this.ws.CLOSING:
 
            return "closing";
 
          case this.ws.CLOSED:
 
            return "close";
 
        }
 
      }
 
    })();
 

	
 
    const ping = this.pinger ? this.pinger.lastPingMs() : "...";
 
    this.statusDisplay.emit(`${conn}; ${this.messagesReceived} recv; ${this.messagesSent} sent; ping ${ping}ms`);
 
  }
 
}
web/SyncedGraph.ts
Show inline comments
 
import debug from "debug";
 
import * as N3 from "n3";
 
import { Quad, Quad_Object, Quad_Predicate, Quad_Subject } from "n3";
 
import { sortBy, unique } from "underscore";
 
import { AutoDependencies, HandlerFunc } from "./AutoDependencies";
 
import { Patch, patchToDeleteEntireGraph } from "./patch";
 
import { RdfDbClient } from "./rdfdbclient";
 

	
 
const log = debug("graph");
 

	
 
const RDF = "http://www.w3.org/1999/02/22-rdf-syntax-ns#";
 

	
 
export class SyncedGraph {
 
  private autoDeps: AutoDependencies;
 
  private client: RdfDbClient;
 
  private graph: N3.Store;
 
  private cachedFloatValues: Map<string, number> = new Map();
 
  private cachedUriValues: Map<string, N3.NamedNode> = new Map();
 
  private prefixFuncs: (prefix: string) => N3.PrefixedToIri;
 
  private serial: any;
 
  private nextNumber: any;
 
  // Main graph object for a browser to use. Consider using RdfdbSyncedGraph element to create & own
 
  // one of these. Syncs both ways with rdfdb. Meant to hide the choice of RDF lib, so we can change it
 
  // later.
 
  //
 
  // Note that _applyPatch is the only method to write to the graph, so
 
  // it can fire subscriptions.
 

	
 
  constructor(
 
    // The /syncedGraph path of an rdfdb server.
 
    patchSenderUrl: string,
 
    // prefixes can be used in Uri(curie) calls. This mapping may grow during loadTrig calls.
 
    public prefixes: Map<string, string>,
 
    private setStatus: (status: string) => void
 
  ) {
 
    this.prefixFuncs = this.rebuildPrefixFuncs(prefixes);
 
    this.graph = new N3.Store();
 
    this.autoDeps = new AutoDependencies(this);
 
    this.autoDeps.graphError.subscribe((e) => {
 
      log("graph learned of error - reconnecting", e);
 
      this.client.disconnect("graph error");
 
    });
 
    this.clearGraph();
 

	
 
    this.client = new RdfDbClient(patchSenderUrl, this._clearGraphOnNewConnection.bind(this), this._applyPatch.bind(this), this.setStatus);
 
  }
 

	
 
  clearGraph() {
 
    // must not try send a patch to the server!
 
    // just deletes the statements; watchers are unaffected.
 
    this.cachedFloatValues = new Map(); // s + '|' + p -> number
 
    this.cachedUriValues = new Map(); // s + '|' + p -> Uri
 

	
 
    const p = patchToDeleteEntireGraph(this.graph);
 
    if (!p.isEmpty()) {
 
      this._applyPatch(p);
 
    }
 
    // if we had a Store already, this lets N3.Store free all its indices/etc
 
    this.graph = new N3.Store();
 
    this.rebuildPrefixFuncs(this.prefixes);
 
  }
 

	
 
  _clearGraphOnNewConnection() {
 
    // must not try send a patch to the server
 

	
 
    log("clearGraphOnNewConnection");
 
    this.clearGraph();
 
    log("clearGraphOnNewConnection done");
 
  }
 

	
 
  private rebuildPrefixFuncs(prefixes: Map<string, string>) {
 
    const p = Object.create(null);
 
    prefixes.forEach((v: string, k: string) => (p[k] = v));
 

	
 
    this.prefixFuncs = N3.Util.prefixes(p);
 
    return this.prefixFuncs;
 
  }
 

	
 
  U() {
 
    // just a shorthand
 
    return this.Uri.bind(this);
 
  }
 

	
 
  Uri(curie: string) {
 
    if (curie == null) {
 
      throw new Error("no uri");
 
    }
 
    if (curie.match(/^http/)) {
 
      return N3.DataFactory.namedNode(curie);
 
    }
 
    const part = curie.split(":");
 
    return this.prefixFuncs(part[0])(part[1]);
 
  }
 

	
 
  // Uri(shorten(u)).value==u
 
  shorten(uri: N3.NamedNode): string {
 
    for (let row of [
 
      { sh: "dev", lo: "http://light9.bigasterisk.com/theater/vet/device/" },
 
      { sh: "effect", lo: "http://light9.bigasterisk.com/effect/" },
 
      { sh: "", lo: "http://light9.bigasterisk.com/" },
 
      { sh: "rdfs", lo: "http://www.w3.org/2000/01/rdf-schema#" },
 
      { sh: "xsd", lo: "http://www.w3.org/2001/XMLSchema#" },
 
    ]) {
 
      if (uri.value.startsWith(row.lo)) {
 
        return row.sh + ":" + uri.value.substring(row.lo.length);
 
      }
 
    }
 
    return uri.value;
 
  }
 

	
 
  Literal(jsValue: string | number) {
 
    return N3.DataFactory.literal(jsValue);
 
  }
 

	
 
  LiteralRoundedFloat(f: number) {
 
    return N3.DataFactory.literal(f.toPrecision(3), this.Uri("http://www.w3.org/2001/XMLSchema#decimal"));
 
  }
 

	
 
  Quad(s: any, p: any, o: any, g: any) {
 
    return N3.DataFactory.quad(s, p, o, g);
 
  }
 

	
 
  toJs(literal: { value: any }) {
 
    // incomplete
 
    return parseFloat(literal.value);
 
  }
 

	
 
  loadTrig(trig: any, cb: () => any) {
 
    // for debugging
 
    const adds: Quad[] = [];
 
    const parser = new N3.Parser();
 
    parser.parse(trig, (error: any, quad: any, prefixes: any) => {
 
      if (error) {
 
        throw new Error(error);
 
      }
 
      if (quad) {
 
        adds.push(quad);
 
      } else {
 
        this._applyPatch(new Patch([], adds));
 
        // todo: here, add those prefixes to our known set
 
        if (cb) {
 
          cb();
 
        }
 
      }
 
    });
 
  }
 

	
 
  quads(): any {
 
    // for debugging
 
    return Array.from(this.graph.getQuads(null, null, null, null)).map((q: Quad) => [q.subject, q.predicate, q.object, q.graph]);
 
  }
 

	
 
  applyAndSendPatch(patch: Patch) {
 
    console.time("applyAndSendPatch");
 
    if (!this.client) {
 
      log("not connected-- dropping patch");
 
      throw new Error("no client yet");
 
    }
 
    if (patch.isEmpty()) {
 
      return;
 
    }
 
    if (!patch.isEmpty()) {
 
      this._applyPatch(patch);
 
      // // chaos delay
 
      //       setTimeout(()=>{
 
      if (this.client) {
 
        log("sending patch:\n", patch.dump());
 
        this.client.sendPatch(patch);
 
      }
 

	
 
      // },300*Math.random())
 
    }
 
    console.timeEnd("applyAndSendPatch");
 
  }
 

	
 
  _applyPatch(patch: Patch) {
 
    // In most cases you want applyAndSendPatch.
 
    //
 
    // This is the only method that writes to this.graph!
 
    if (patch.isEmpty()) throw "dont send empty patches here";
 
    log("_applyPatch [1] \n", patch.dump());
 
    this.cachedFloatValues.clear();
 
    this.cachedUriValues.clear();
 
    patch.applyToGraph(this.graph);
 
    if (false) {
 
      log("applied patch locally", patch.summary());
 
    } else {
 
      log("applied patch locally:\n" + patch.dump());
 
    }
 
    this.autoDeps.graphChanged(patch);
 
  }
 

	
 
  getObjectPatch(s: N3.NamedNode, p: N3.NamedNode, newObject: N3.Quad_Object | null, g: N3.NamedNode): Patch {
 
    // make a patch which removes existing values for (s,p,*,c) and
 
    // adds (s,p,newObject,c). Values in other graphs are not affected.
 
    const existing = this.graph.getQuads(s, p, null, g);
 
    return new Patch(existing, newObject !== null ? [this.Quad(s, p, newObject, g)] : []);
 
  }
 

	
 
  patchObject(s: N3.NamedNode, p: N3.NamedNode, newObject: N3.Quad_Object | null, g: N3.NamedNode) {
 
    this.applyAndSendPatch(this.getObjectPatch(s, p, newObject, g));
 
  }
 

	
 
  clearObjects(s: N3.NamedNode, p: N3.NamedNode, g: N3.NamedNode) {
 
    this.applyAndSendPatch(new Patch(this.graph.getQuads(s, p, null, g), []));
 
  }
 

	
 
  public runHandler(func: HandlerFunc, label: string) {
 
    // runs your func once, tracking graph calls. if a future patch
 
    // matches what you queried, we runHandler your func again (and
 
    // forget your queries from the first time).
 

	
 
    // helps with memleak? not sure yet. The point was if two matching
 
    // labels get puushed on, we should run only one. So maybe
 
    // appending a serial number is backwards.
 
    if (!this.serial) {
 
      this.serial = 1;
 
    }
 
    this.serial += 1;
 
    //label = label + @serial
 

	
 
    this.autoDeps.runHandler(func, label);
 
  }
 

	
 
  _singleValue(s: Quad_Subject, p: Quad_Predicate) {
 
    this.autoDeps.askedFor(s, p, null, null);
 
    const quads = this.graph.getQuads(s, p, null, null);
 
    const objs = new Set(Array.from(quads).map((q: Quad) => q.object));
 

	
 
    switch (objs.size) {
 
      case 0:
 
        throw new Error("no value for " + s.value + " " + p.value);
 
      case 1:
 
        var obj = objs.values().next().value;
 
        return obj;
 
      default:
 
        throw new Error("too many different values: " + JSON.stringify(quads));
 
    }
 
  }
 

	
 
  floatValue(s: Quad_Subject, p: Quad_Predicate) {
 
    const key = s.value + "|" + p.value;
 
    const hit = this.cachedFloatValues.get(key);
 
    if (hit !== undefined) {
 
      return hit;
 
    }
 
    //log('float miss', s, p)
 

	
 
    const v = this._singleValue(s, p).value;
 
    const ret = parseFloat(v);
 
    if (isNaN(ret)) {
 
      throw new Error(`${s.value} ${p.value} -> ${v} not a float`);
 
    }
 
    this.cachedFloatValues.set(key, ret);
 
    return ret;
 
  }
 

	
 
  stringValue(s: any, p: any) {
 
    return this._singleValue(s, p).value;
 
  }
 

	
 
  uriValue(s: Quad_Subject, p: Quad_Predicate) {
 
    const key = s.value + "|" + p.value;
 
    const hit = this.cachedUriValues.get(key);
 
    if (hit !== undefined) {
 
      return hit;
 
    }
 

	
 
    const ret = this._singleValue(s, p);
 
    this.cachedUriValues.set(key, ret);
 
    return ret;
 
  }
 

	
 
  labelOrTail(uri: { value: { split: (arg0: string) => any } }) {
 
    let ret: any;
 
    try {
 
      ret = this.stringValue(uri, this.Uri("rdfs:label"));
 
    } catch (error) {
 
      const words = uri.value.split("/");
 
      ret = words[words.length - 1];
 
    }
 
    if (!ret) {
 
      ret = uri.value;
 
    }
 
    return ret;
 
  }
 

	
 
  objects(s: any, p: any): Quad_Object[] {
 
    this.autoDeps.askedFor(s, p, null, null);
 
    const quads = this.graph.getQuads(s, p, null, null);
 
    return Array.from(quads).map((q: { object: any }) => q.object);
 
  }
 

	
 
  subjects(p: any, o: any): Quad_Subject[] {
 
    this.autoDeps.askedFor(null, p, o, null);
 
    const quads = this.graph.getQuads(null, p, o, null);
 
    return Array.from(quads).map((q: { subject: any }) => q.subject);
 
  }
 

	
 
  subjectStatements(s: Quad_Subject): Quad[] {
 
    this.autoDeps.askedFor(s, null, null, null);
 
    const quads = this.graph.getQuads(s, null, null, null);
 
    return quads;
 
  }
 

	
 
  items(list: any) {
 
    const out = [];
 
    let current = list;
 
    while (true) {
 
      if (current.value === RDF + "nil") {
 
        break;
 
      }
 

	
 
      this.autoDeps.askedFor(current, null, null, null); // a little loose
 

	
 
      const firsts = this.graph.getQuads(current, RDF + "first", null, null);
 
      const rests = this.graph.getQuads(current, RDF + "rest", null, null);
 
      if (firsts.length !== 1) {
 
        throw new Error(`list node ${current} has ${firsts.length} rdf:first edges`);
 
      }
 
      out.push(firsts[0].object);
 

	
 
      if (rests.length !== 1) {
 
        throw new Error(`list node ${current} has ${rests.length} rdf:rest edges`);
 
      }
 
      current = rests[0].object;
 
    }
 

	
 
    return out;
 
  }
 

	
 
  contains(s: any, p: any, o: any): boolean {
 
    this.autoDeps.askedFor(s, p, o, null);
 
    // Sure this is a nice warning to remind me to rewrite, but the graph.size call itself was taking 80% of the time in here
 
    // log("contains calling getQuads when graph has ", this.graph.size);
 
    return this.graph.getQuads(s, p, o, null).length > 0;
 
  }
 

	
 
  nextNumberedResources(base: { id: any }, howMany: number) {
 
    // base is NamedNode or string
 
    // Note this is unsafe before we're synced with the graph. It'll
 
    // always return 'name0'.
 
    if (base.id) {
 
      base = base.id;
 
    }
 
    const results = [];
 

	
 
    // @contains is really slow.
 
    if (this.nextNumber == null) {
 
      this.nextNumber = new Map();
 
    }
 
    let start = this.nextNumber.get(base);
 
    if (start === undefined) {
 
      start = 0;
 
    }
 

	
 
    for (let serial = start, asc = start <= 1000; asc ? serial <= 1000 : serial >= 1000; asc ? serial++ : serial--) {
 
      const uri = this.Uri(`${base}${serial}`);
 
      if (!this.contains(uri, null, null)) {
 
        results.push(uri);
 
        log("nextNumberedResources", `picked ${uri}`);
 
        this.nextNumber.set(base, serial + 1);
 
        if (results.length >= howMany) {
 
          return results;
 
        }
 
      }
web/rdfdbclient.ts
Show inline comments
 
import debug from "debug";
 
import { parseJsonPatch, Patch } from "./patch";
 
import { RdfDbChannel } from "./RdfDbChannel";
 
const log = debug("rdfdbclient");
 

	
 
export class RdfDbClient {
 
  private channel: RdfDbChannel;
 
  _patchesToSend: Patch[];
 
  private patchesToSend: Patch[];
 
  // Send and receive patches from rdfdb. Primarily used in SyncedGraph.
 
  //
 
  // What this should do, and does not yet, is keep the graph
 
  // 'coasting' over a reconnect, applying only the diffs from the old
 
  // contents to the new ones once they're in. Then, remove all the
 
  // clearGraph stuff in graph.coffee that doesn't even work right.
 
  //
 
  constructor(
 
    patchSenderUrl: string,
 
    private clearGraphOnNewConnection: () => void,
 
    private applyPatch: (p: Patch) => void,
 
    setStatus: (status: string) => void
 
  ) {
 
    this._patchesToSend = [];
 
    this.patchesToSend = [];
 
    this.channel = new RdfDbChannel(patchSenderUrl);
 
    this.channel.statusDisplay.subscribe((st: string) => {
 
      setStatus(st + `; ${this._patchesToSend.length} pending `);
 
      setStatus(st + `; ${this.patchesToSend.length} pending `);
 
    });
 
    this.channel.newConnection.subscribe(() => {
 
      this.clearGraphOnNewConnection();
 
    });
 
    this.channel.serverMessage.subscribe((m) => {
 
      parseJsonPatch(m.body, (p: Patch) => {
 
        log('patch from server:', p.dump())
 
        if (p.isEmpty()) {
 
          return;
 
        }
 
        this.applyPatch(p);
 
      });
 
    });
 
  }
 

	
 
  sendPatch(patch: Patch) {
 
    log("queue patch to server ", patch.summary());
 
    this._patchesToSend.push(patch);
 
    this.patchesToSend.push(patch);
 
    this._continueSending();
 
  }
 

	
 
  disconnect(why:string) {
 
    this.channel.disconnect(why);
 
  }
 

	
 
  async _continueSending() {
 
    // we could call this less often and coalesce patches together to optimize
 
    // the dragging cases. See rdfdb 'compactPatches' and 'processInbox'.
 
    while (this._patchesToSend.length) {
 
      const patch = this._patchesToSend.splice(0, 1)[0];
 
    while (this.patchesToSend.length) {
 
      const patch = this.patchesToSend.splice(0, 1)[0];
 
      const json = await patch.toJsonPatch();
 
      const ret = this.channel.sendMessage(json);
 
      if (!ret) {
 
        log('sendMessage failed- retrying')
 
        this.patchesToSend.unshift(patch);
 
        setTimeout(this._continueSending.bind(this), 500);
 

	
 
        // this.disconnect()
 
        return;
 
      }
 
    }
 
  }
 
}
0 comments (0 inline, 0 general)