Changeset - 59eab70254fa
[Not reviewed]
default
0 1 0
Drew Perttula - 9 years ago 2016-06-05 05:01:22
drewp@bigasterisk.com
refactor the rdfdb networking out of (js) SyncedGraph
Ignore-this: badbef9ba60ad9f5611fdcdc67c5c34
1 file changed with 78 insertions and 76 deletions:
0 comments (0 inline, 0 general)
light9/web/graph.coffee
Show inline comments
 
@@ -39,7 +39,7 @@ class GraphWatchers
 
        cb({delQuads: [], addQuads: [quad]})
 

	
 

	
 
jsonPatch = (jsPatch, cb) ->
 
toJsonPatch = (jsPatch, cb) ->
 
  out = {patch: {adds: '', deletes: ''}}
 

	
 
  writeDels = (cb) ->
 
@@ -60,33 +60,50 @@ jsonPatch = (jsPatch, cb) ->
 
      cb(JSON.stringify(out))
 
    )
 

	
 
class window.SyncedGraph
 
  # Note that applyPatch is the only method to write to the graph, so
 
  # it can fire subscriptions.
 
parseJsonPatch = (jsonPatch, cb) ->
 
  # note response cb doesn't have an error arg.
 
  input = JSON.parse(jsonPatch)
 
  patch = {delQuads: [], addQuads: []}
 

	
 
  constructor: (@patchSenderUrl, @prefixes) ->
 
    @resetStore()
 
    @_watchers = new GraphWatchers()
 
    @patchesToSend = []
 
  parseAdds = (cb) =>
 
    parser = N3.Parser()
 
    parser.parse input.patch.adds, (error, quad, prefixes) =>
 
                  if (quad)
 
                    patch.addQuads.push(quad)
 
                  else
 
                    cb()
 
  parseDels = (cb) =>
 
    parser = N3.Parser()
 
    parser.parse input.patch.deletes, (error, quad, prefixes) =>
 
                  if (quad)
 
                    patch.delQuads.push(quad)
 
                  else
 
                    cb()
 

	
 
  async.parallel([parseAdds, parseDels], ((err) => cb(patch)))
 

	
 
class RdfDbClient
 
  # Send and receive patches from rdfdb
 
  constructor: (@patchSenderUrl, @clearGraph, @applyPatch) ->
 
    @_patchesToSend = []
 

	
 
    @_reconnectionTimeout = null
 
    @newConnection()
 
    @_newConnection()
 

	
 
  resetStore: ->
 
    if @graph?
 
      @_watchers.graphChanged({addQuads: [], delQuads: @graph.find()})
 
    @graph = N3.Store()
 
    @_addPrefixes(@prefixes)
 
  sendPatch: (patch) ->
 
    console.log('queue patch to server ', patchSizeSummary(patch))
 
    @_patchesToSend.push(patch)
 
    @_continueSending()           
 

	
 
  newConnection: ->
 
  _newConnection: ->
 
    fullUrl = 'ws://' + window.location.host + @patchSenderUrl
 
    @ws.close() if @ws?
 
    @ws = new WebSocket(fullUrl)
 

	
 
    @ws.onopen = =>
 
      log('connected to', fullUrl)
 
      @resetStore()
 
      @pingLoop()
 
      @clearGraph()
 
      @_pingLoop()
 

	
 
    @ws.onerror = (e) =>
 
      log('ws error ' + e)
 
@@ -95,41 +112,22 @@ class window.SyncedGraph
 
    @ws.onclose = =>
 
      log('ws close')
 
      clearTimeout(@_reconnectionTimeout) if @_reconnectionTimeout?
 
      @_reconnectionTimeout = setTimeout(@newConnection.bind(@), 1000)
 
      @_reconnectionTimeout = setTimeout(@_newConnection.bind(@), 1000)
 

	
 
    @ws.onmessage = (evt) =>
 
      if evt.data == 'PONG'
 
        return
 
      @onMessage(JSON.parse(evt.data))
 
    @ws.onmessage = @_onMessage.bind(@)
 

	
 
  pingLoop: () ->
 
  _pingLoop: () ->
 
    if @ws.readyState == @ws.OPEN
 
      @ws.send('PING')
 
      
 
      clearTimeout(@_pingLoopTimeout) if @_pingLoopTimeout?
 
      @_pingLoopTimeout = setTimeout(@pingLoop.bind(@), 10000)
 

	
 
  onMessage: (msg) ->
 
    log('from rdfdb: ', msg)
 
      
 
    patch = {delQuads: [], addQuads: []}
 
      @_pingLoopTimeout = setTimeout(@_pingLoop.bind(@), 10000)
 

	
 
    parseAdds = (cb) =>
 
      parser = N3.Parser()
 
      parser.parse msg.patch.adds, (error, quad, prefixes) =>
 
                    if (quad)
 
                      patch.addQuads.push(quad)
 
                    else
 
                      cb()
 
    parseDels = (cb) =>
 
      parser = N3.Parser()
 
      parser.parse msg.patch.deletes, (error, quad, prefixes) =>
 
                    if (quad)
 
                      patch.delQuads.push(quad)
 
                    else
 
                      cb()
 
      
 
    async.parallel([parseAdds, parseDels], ((err) => @applyPatch(patch)))
 
  _onMessage: (evt) ->
 
    msg = evt.data
 
    if msg == 'PONG'
 
      return
 
    parseJsonPatch(msg, @applyPatch.bind(@))
 

	
 
  _continueSending: ->
 
    if @ws.readyState != @ws.OPEN
 
@@ -140,15 +138,41 @@ class window.SyncedGraph
 
    # the dragging cases.
 

	
 
    sendOne = (patch, cb) =>
 
        jsonPatch(patch, (json) =>
 
        toJsonPatch(patch, (json) =>
 
          log('send patch to server, ' + json.length + ' bytes')
 
          @ws.send(json)
 
          cb(null)
 
      )
 

	
 
    async.eachSeries(@patchesToSend, sendOne, () =>
 
        @patchesToSend = []
 
    async.eachSeries(@_patchesToSend, sendOne, () =>
 
        @_patchesToSend = []
 
      )
 

	
 
class window.SyncedGraph
 
  # Main graph object for a browser to use. Syncs both ways with
 
  # rdfdb. Meant to hide the choice of RDF lib, so we can change it
 
  # later.
 
  # 
 
  # Note that _applyPatch is the only method to write to the graph, so
 
  # it can fire subscriptions.
 

	
 
  constructor: (@patchSenderUrl, @prefixes) ->
 
    # patchSenderUrl is the /syncedGraph path of an rdfdb server.
 
    # prefixes can be used in Uri(curie) calls.
 
    @_watchers = new GraphWatchers()
 
    @clearGraph()
 

	
 
    @_client = new RdfDbClient(@patchSenderUrl, @clearGraph.bind(@),
 
                               @_applyPatch.bind(@))
 
    
 
  clearGraph: ->
 
    log('SyncedGraph clear')
 
    if @graph?
 
      @_applyPatch({addQuads: [], delQuads: @graph.find()})
 

	
 
    # if we had a Store already, this lets N3.Store free all its indices/etc
 
    @graph = N3.Store()
 
    @_addPrefixes(@prefixes)
 
    
 
      
 
  _addPrefixes: (prefixes) ->
 
@@ -175,7 +199,7 @@ class window.SyncedGraph
 
                  if (quad)
 
                    patch.addQuads.push(quad)
 
                  else
 
                    @applyPatch(patch)
 
                    @_applyPatch(patch)
 
                    @_addPrefixes(prefixes)
 
                    cb() if cb
 
                    
 
@@ -183,15 +207,13 @@ class window.SyncedGraph
 
    [q.subject, q.predicate, q.object, q.graph] for q in @graph.find()
 

	
 
  applyAndSendPatch: (patch, cb) ->
 
    @applyPatch(patch)
 
    console.log('queue patch to server ', patchSizeSummary(patch))
 
    @patchesToSend.push(patch)
 
    @_continueSending()
 
    @_applyPatch(patch)
 
    @_client.sendPatch(patch)
 

	
 
  applyPatch: (patch) ->
 
  _applyPatch: (patch) ->
 
    # In most cases you want applyAndSendPatch.
 
    # 
 
    # This is the only method that writes to the graph!
 
    # This is the only method that writes to @graph!
 
    for quad in patch.delQuads
 
      @graph.removeTriple(quad)
 
    for quad in patch.addQuads
 
@@ -224,6 +246,7 @@ class window.SyncedGraph
 
  unsubscribe: (subscription) ->
 
    @_watchers.unsubscribe(subscription)
 

	
 

	
 
  floatValue: (s, p) ->
 
    quads = @graph.findByIRI(s, p)
 
    switch quads.length
 
@@ -246,24 +269,3 @@ class window.SyncedGraph
 

	
 
  contains: (s, p, o) ->
 

	
 
###
 
rdfstore.create((err, store) ->
 
  window.store = store
 
  store.setPrefix('l9', "http://light9.bigasterisk.com/")
 
  store.setPrefix('xsd', "http://www.w3.org/2001/XMLSchema#")
 
  store.load('text/turtle', "
 
@prefix : <http://light9.bigasterisk.com/> .
 
@prefix dev: <http://light9.bigasterisk.com/device/> .
 

	
 
:demoResource :startTime 0.5 .
 
    ", (err, n) ->
 
      console.log('loaded', n)
 
      store.graph (err, graph) ->
 
        window.graph = graph
 
        
 
    )
 
  window.URI = (curie) -> store.rdf.createNamedNode(store.rdf.resolve(curie))
 
  window.Lit = (value, dtype) -> store.rdf.createLiteral(value, null, URI(dtype))
 

	
 
  )
 
###
 
\ No newline at end of file
0 comments (0 inline, 0 general)