Files @ 36f58b2aa8ef
Branch filter:

Location: light9/light9/web/graph.coffee

Drew Perttula
browser syncedgraph sends patches back to server
Ignore-this: eb8d3f018ff97f7389c4af3efa62fd9
log = console.log

# Patch is {addQuads: <quads>, delQuads: <quads>}
# <quads> is [{subject: s, ...}, ...]

patchSizeSummary = (patch) ->
  '-' + patch.delQuads.length + ' +' + patch.addQuads.length

# partial port of autodepgraphapi.py
class GraphWatchers
  constructor: ->
    @handlersSp = {} # {s: {p: [handlers]}}
  subscribe: (s, p, o, onChange) -> # return subscription handle
    if o? then throw Error('not implemented')
    if not @handlersSp[s]
      @handlersSp[s] = {}
    if not @handlersSp[s][p]
      @handlersSp[s][p] = []
    @handlersSp[s][p].push(onChange)
    
  unsubscribe: (subscription) ->
    throw Error('not implemented')

  matchingHandlers: (quad) ->
    matches = []
    for subjDict in [@handlersSp[quad.subject] || {}, @handlersSp[null] || {}]
      for subjPredMatches in [subjDict[quad.predicate] || [], subjDict[null] || []]
        matches = matches.concat(subjPredMatches)
    return matches
    
  graphChanged: (patch) ->
    for quad in patch.delQuads
      for cb in @matchingHandlers(quad)
        # currently calls multiple times, which is ok, but we might
        # group things into fewer patches
        cb({delQuads: [quad], addQuads: []})
    for quad in patch.addQuads
      for cb in @matchingHandlers(quad)
        cb({delQuads: [], addQuads: [quad]})


jsonPatch = (jsPatch, cb) ->
  out = {patch: {adds: '', deletes: ''}}

  writeDels = (cb) ->
    writer = N3.Writer({ format: 'N-Quads' })
    writer.addTriples(jsPatch.delQuads)
    writer.end((err, result) ->
      out.patch.deletes = result
      cb())

  writeAdds = (cb) ->
    writer = N3.Writer({ format: 'N-Quads' })
    writer.addTriples(jsPatch.addQuads)
    writer.end((err, result) ->
      out.patch.adds = result
      cb())
    
  async.parallel([writeDels, writeAdds], (err) ->
      cb(JSON.stringify(out))
    )

class window.SyncedGraph
  # Note that applyPatch is the only method to write to the graph, so
  # it can fire subscriptions.

  constructor: (@patchSenderUrl, prefixes) ->
    @graph = N3.Store()
    @_addPrefixes(prefixes)
    @_watchers = new GraphWatchers()
    @patchesToSend = []
    @newConnection()

  newConnection: ->
    fullUrl = 'ws://' + window.location.host + @patchSenderUrl
    @ws = new WebSocket(fullUrl)

    @ws.onopen = =>
      log('connected to', fullUrl)

    @ws.onerror = (e) =>
      log('ws error ' + e)

    @ws.onclose = =>
      log('ws close')

    @ws.onmessage = (evt) =>
      @onMessage(JSON.parse(evt.data))

  onMessage: (msg) ->
    log('from rdfdb: ', msg)
    
    patch = {delQuads: [], addQuads: []}

    parseAdds = (cb) =>
      parser = N3.Parser()
      parser.parse msg.patch.adds, (error, quad, prefixes) =>
                    if (quad)
                      patch.addQuads.push(quad)
                    else
                      cb()
    parseDels = (cb) =>
      parser = N3.Parser()
      parser.parse msg.patch.deletes, (error, quad, prefixes) =>
                    if (quad)
                      patch.delQuads.push(quad)
                    else
                      cb()
      
    async.parallel([parseAdds, parseDels], ((err) => @applyPatch(patch)))

  _continueSending: ->
    if @ws.readyState != @ws.OPEN
      setTimeout(@_continueSending.bind(@), 500)
      return

    # we could call this less often and coalesce patches together to optimize
    # the dragging cases.

    sendOne = (patch, cb) =>
        jsonPatch(patch, (json) =>
          log('send patch to server, ' + json.length + ' bytes')
          @ws.send(json)
          cb(null)
      )

    async.eachSeries(@patchesToSend, sendOne, () =>
        @patchesToSend = []
      )
    
      
  _addPrefixes: (prefixes) ->
    @graph.addPrefixes(prefixes)
        
  Uri: (curie) ->
    N3.Util.expandPrefixedName(curie, @graph._prefixes)

  Literal: (jsValue) ->
    N3.Util.createLiteral(jsValue)

  LiteralRoundedFloat: (f) ->
    N3.Util.createLiteral(d3.format(".3f")(f),
                          "http://www.w3.org/2001/XMLSchema#decimal")

  toJs: (literal) ->
    # incomplete
    parseFloat(N3.Util.getLiteralValue(literal))

  loadTrig: (trig, cb) -> # for debugging
    patch = {delQuads: [], addQuads: []}
    parser = N3.Parser()
    parser.parse trig, (error, quad, prefixes) =>
                  if (quad)
                    patch.addQuads.push(quad)
                  else
                    @applyPatch(patch)
                    @_addPrefixes(prefixes)
                    cb() if cb
                    
  quads: () -> # for debugging
    [q.subject, q.predicate, q.object, q.graph] for q in @graph.find()

  applyAndSendPatch: (patch, cb) ->
    @applyPatch(patch)
    console.log('queue patch to server ', patchSizeSummary(patch))
    @patchesToSend.push(patch)
    @_continueSending()

  applyPatch: (patch) ->
    # In most cases you want applyAndSendPatch.
    # 
    # This is the only method that writes to the graph!
    for quad in patch.delQuads
      @graph.removeTriple(quad)
    for quad in patch.addQuads
      @graph.addTriple(quad)
    log('applied patch locally', patchSizeSummary(patch))
    @_watchers.graphChanged(patch)

  getObjectPatch: (s, p, newObject, g) ->
    # send a patch which removes existing values for (s,p,*,c) and
    # adds (s,p,newObject,c). Values in other graphs are not affected.
    existing = @graph.findByIRI(s, p, null, g)
    return {
      delQuads: existing,
      addQuads: [{subject: s, predicate: p, object: newObject, graph: g}]
    }

  patchObject: (s, p, newObject, g) ->
    @applyAndSendPatch(@getObjectPatch(s, p, newObject, g))
  

  subscribe: (s, p, o, onChange) -> # return subscription handle
    # onChange is called with a patch that's limited to the quads
    # that match your request.
    # We call you immediately on existing triples.
    @_watchers.subscribe(s, p, o, onChange)
    immediatePatch = {delQuads: [], addQuads: @graph.findByIRI(s, p, o)}
    if immediatePatch.addQuads.length
      onChange(immediatePatch)

  unsubscribe: (subscription) ->
    @_watchers.unsubscribe(subscription)

  floatValue: (s, p) ->
    quads = @graph.findByIRI(s, p)
    switch quads.length
      when 0 then throw new Error("no value for "+s+" "+p)
      when 1
        obj = quads[0].object
        return parseFloat(N3.Util.getLiteralValue(obj))
      else
        throw new Error("too many values: " + JSON.stringify(quads))
    
  stringValue: (s, p) ->

  uriValue: (s, p) ->

  objects: (s, p) ->

  subjects: (p, o) ->

  items: (list) ->

  contains: (s, p, o) ->

###
rdfstore.create((err, store) ->
  window.store = store
  store.setPrefix('l9', "http://light9.bigasterisk.com/")
  store.setPrefix('xsd', "http://www.w3.org/2001/XMLSchema#")
  store.load('text/turtle', "
@prefix : <http://light9.bigasterisk.com/> .
@prefix dev: <http://light9.bigasterisk.com/device/> .

:demoResource :startTime 0.5 .
    ", (err, n) ->
      console.log('loaded', n)
      store.graph (err, graph) ->
        window.graph = graph
        
    )
  window.URI = (curie) -> store.rdf.createNamedNode(store.rdf.resolve(curie))
  window.Lit = (value, dtype) -> store.rdf.createLiteral(value, null, URI(dtype))

  )
###