Changeset - ac55319a2eac
[Not reviewed]
default
0 3 0
drewp@bigasterisk.com - 8 months ago 2024-05-21 23:10:39
drewp@bigasterisk.com
don't drop patches that arrive before we get WS connected
3 files changed with 13 insertions and 13 deletions:
0 comments (0 inline, 0 general)
web/RdfDbChannel.ts
Show inline comments
 
@@ -27,48 +27,49 @@ class ChannelPinger {
 
    }
 
    this.timeoutId = (setTimeout(this._pingLoop.bind(this), 10000) as unknown) as number;
 
  }
 
}
 

	
 
export class RdfDbChannel {
 
  // lower level reconnecting websocket -- knows about message types, but not what's inside a patch body
 
  private ws?: WebSocket = undefined;
 
  private pinger?: ChannelPinger;
 
  private connectionId: string = "none"; // server's name for us
 
  private reconnectTimer?: NodeJS.Timeout = undefined;
 
  private messagesReceived = 0; // (non-ping messages)
 
  private messagesSent = 0;
 

	
 
  newConnection: SubEvent<void> = new SubEvent();
 
  serverMessage: SubEvent<{ evType: string; body: SyncgraphPatchMessage }> = new SubEvent();
 
  statusDisplay: SubEvent<string> = new SubEvent();
 

	
 
  constructor(public patchSenderUrl: string) {
 
    this.openConnection();
 
  }
 
  sendMessage(body: string): boolean {
 
    // one try, best effort, true if we think it worked
 
    if (!this.ws || this.ws.readyState !== this.ws.OPEN) {
 
      log("dropping message: " + body);
 
      return false;
 
    }
 
    log("send patch to server, " + body.length + " bytes");
 
    this.ws.send(body);
 
    this.messagesSent++;
 
    this.updateStatus();
 
    return true;
 
  }
 

	
 
  disconnect(why:string) {
 
    // will be followed by an autoconnect
 
    log("disconnect requested:", why);
 
    if (this.ws !== undefined) {
 
      const closeHandler = this.ws.onclose?.bind(this.ws);
 
      if (!closeHandler) {
 
        throw new Error();
 
      }
 
      closeHandler(new CloseEvent("forced"));
 
    }
 
  }
 

	
 
  private openConnection() {
 
    const wsOrWss = window.location.protocol.replace("http", "ws");
 
    const fullUrl = wsOrWss + "//" + window.location.host + this.patchSenderUrl;
web/SyncedGraph.ts
Show inline comments
 
@@ -130,65 +130,62 @@ export class SyncedGraph {
 
    const adds: Quad[] = [];
 
    const parser = new N3.Parser();
 
    parser.parse(trig, (error: any, quad: any, prefixes: any) => {
 
      if (error) {
 
        throw new Error(error);
 
      }
 
      if (quad) {
 
        adds.push(quad);
 
      } else {
 
        this._applyPatch(new Patch([], adds));
 
        // todo: here, add those prefixes to our known set
 
        if (cb) {
 
          cb();
 
        }
 
      }
 
    });
 
  }
 

	
 
  quads(): any {
 
    // for debugging
 
    return Array.from(this.graph.getQuads(null, null, null, null)).map((q: Quad) => [q.subject, q.predicate, q.object, q.graph]);
 
  }
 

	
 
  applyAndSendPatch(patch: Patch) {
 
    console.time("applyAndSendPatch");
 
    if (!this.client) {
 
      log("not connected-- dropping patch");
 
      throw new Error("no client yet");
 
    }
 
    if (patch.isEmpty()) {
 
      return;
 
    }
 
    if (!patch.isEmpty()) {
 
      this._applyPatch(patch);
 
      // // chaos delay
 
      //       setTimeout(()=>{
 
      if (this.client) {
 
        log("sending patch:\n", patch.dump());
 
        this.client.sendPatch(patch);
 
      }
 

	
 
      // },300*Math.random())
 
    }
 
    console.timeEnd("applyAndSendPatch");
 
  }
 

	
 
  _applyPatch(patch: Patch) {
 
    // In most cases you want applyAndSendPatch.
 
    //
 
    // This is the only method that writes to this.graph!
 
    if (patch.isEmpty()) throw "dont send empty patches here";
 
    log("_applyPatch [1] \n", patch.dump());
 
    this.cachedFloatValues.clear();
 
    this.cachedUriValues.clear();
 
    patch.applyToGraph(this.graph);
 
    if (false) {
 
      log("applied patch locally", patch.summary());
 
    } else {
 
      log("applied patch locally:\n" + patch.dump());
 
    }
 
    this.autoDeps.graphChanged(patch);
 
  }
 

	
 
  getObjectPatch(s: N3.NamedNode, p: N3.NamedNode, newObject: N3.Quad_Object | null, g: N3.NamedNode): Patch {
 
    // make a patch which removes existing values for (s,p,*,c) and
 
    // adds (s,p,newObject,c). Values in other graphs are not affected.
 
    const existing = this.graph.getQuads(s, p, null, g);
 
    return new Patch(existing, newObject !== null ? [this.Quad(s, p, newObject, g)] : []);
 
  }
web/rdfdbclient.ts
Show inline comments
 
import debug from "debug";
 
import { parseJsonPatch, Patch } from "./patch";
 
import { RdfDbChannel } from "./RdfDbChannel";
 
const log = debug("rdfdbclient");
 

	
 
export class RdfDbClient {
 
  private channel: RdfDbChannel;
 
  _patchesToSend: Patch[];
 
  private patchesToSend: Patch[];
 
  // Send and receive patches from rdfdb. Primarily used in SyncedGraph.
 
  //
 
  // What this should do, and does not yet, is keep the graph
 
  // 'coasting' over a reconnect, applying only the diffs from the old
 
  // contents to the new ones once they're in. Then, remove all the
 
  // clearGraph stuff in graph.coffee that doesn't even work right.
 
  //
 
  constructor(
 
    patchSenderUrl: string,
 
    private clearGraphOnNewConnection: () => void,
 
    private applyPatch: (p: Patch) => void,
 
    setStatus: (status: string) => void
 
  ) {
 
    this._patchesToSend = [];
 
    this.patchesToSend = [];
 
    this.channel = new RdfDbChannel(patchSenderUrl);
 
    this.channel.statusDisplay.subscribe((st: string) => {
 
      setStatus(st + `; ${this._patchesToSend.length} pending `);
 
      setStatus(st + `; ${this.patchesToSend.length} pending `);
 
    });
 
    this.channel.newConnection.subscribe(() => {
 
      this.clearGraphOnNewConnection();
 
    });
 
    this.channel.serverMessage.subscribe((m) => {
 
      parseJsonPatch(m.body, (p: Patch) => {
 
        log('patch from server:', p.dump())
 
        if (p.isEmpty()) {
 
          return;
 
        }
 
        this.applyPatch(p);
 
      });
 
    });
 
  }
 

	
 
  sendPatch(patch: Patch) {
 
    log("queue patch to server ", patch.summary());
 
    this._patchesToSend.push(patch);
 
    this.patchesToSend.push(patch);
 
    this._continueSending();
 
  }
 

	
 
  disconnect(why:string) {
 
    this.channel.disconnect(why);
 
  }
 

	
 
  async _continueSending() {
 
    // we could call this less often and coalesce patches together to optimize
 
    // the dragging cases. See rdfdb 'compactPatches' and 'processInbox'.
 
    while (this._patchesToSend.length) {
 
      const patch = this._patchesToSend.splice(0, 1)[0];
 
    while (this.patchesToSend.length) {
 
      const patch = this.patchesToSend.splice(0, 1)[0];
 
      const json = await patch.toJsonPatch();
 
      const ret = this.channel.sendMessage(json);
 
      if (!ret) {
 
        log('sendMessage failed- retrying')
 
        this.patchesToSend.unshift(patch);
 
        setTimeout(this._continueSending.bind(this), 500);
 

	
 
        // this.disconnect()
 
        return;
 
      }
 
    }
 
  }
 
}
0 comments (0 inline, 0 general)