Changeset - 36f58b2aa8ef
[Not reviewed]
default
0 3 0
Drew Perttula - 9 years ago 2016-06-05 03:21:31
drewp@bigasterisk.com
browser syncedgraph sends patches back to server
Ignore-this: eb8d3f018ff97f7389c4af3efa62fd9
3 files changed with 65 insertions and 17 deletions:
0 comments (0 inline, 0 general)
bin/rdfdb
Show inline comments
 
@@ -396,51 +396,52 @@ class GraphClients(PrettyErrorHandler, c
 
            self.settings.db.addClient(Client(upd, self.get_argument("label")))
 
        except:
 
            import traceback
 
            traceback.print_exc()
 
            raise
 
            
 
_wsClientSerial = 0
 
class WebsocketClient(cyclone.websocket.WebSocketHandler):
 

	
 
    def connectionMade(self, *args, **kwargs):
 
        global _wsClientSerial
 
        connectionId = 'connection-%s' % _wsClientSerial
 
        _wsClientSerial += 1
 

	
 
        self.wsClient = WsClient(connectionId, self.sendMessage)
 
        log.info("new ws client %r", self.wsClient)
 
        self.settings.db.addClient(self.wsClient)
 

	
 
    def connectionLost(self, reason):
 
        log.info("bye ws client %r", self.wsClient)
 
        self.settings.db.clientErrored(
 
            Failure(WebsocketDisconnect(reason)), self.wsClient)
 

	
 
    def messageReceived(self, message):
 
        log.info("got message from %s: %s", self.connectionId, message)
 
        # how
 
        self.sendMessage(message)
 
        log.info("got message from %r: %s", self.wsClient, message)
 
        p = Patch(jsonRepr=message)
 
        p.senderUpdateUri = self.wsClient.updateUri
 
        self.settings.db.patch(p)
 

	
 
liveClients = set()
 
def sendToLiveClients(d=None, asJson=None):
 
    j = asJson or json.dumps(d)
 
    for c in liveClients:
 
        c.sendMessage(j)
 

	
 
class Live(cyclone.websocket.WebSocketHandler):
 

	
 
    def connectionMade(self, *args, **kwargs):
 
        log.info("websocket opened")
 
        liveClients.add(self)
 
        self.settings.db.sendClientsToAllLivePages()
 

	
 
    def connectionLost(self, reason):
 
        log.info("websocket closed")
 
        liveClients.remove(self)
 

	
 
    def messageReceived(self, message):
 
        log.info("got message %s" % message)
 
        self.sendMessage(message)
 

	
 
class NoExts(cyclone.web.StaticFileHandler):
 
    # .html pages can be get() without .html on them
light9/web/adjustable.coffee
Show inline comments
 
@@ -8,63 +8,60 @@ class Adjustable
 
  # The way dragging should work is that you start in the yellow *adj
 
  # widget*, wherever it is, but your drag is moving the *target*. The
 
  # adj will travel around too, but it may do extra moves to not bump
 
  # into stuff or to get out from under your finger.
 

	
 
  constructor: (@config) ->
 
    # config has:
 
    #   getTarget -> vec2 of current target position
 
    #   getSuggestedTargetOffset -> vec2 pixel offset from target
 
    #   emptyBox -> true if you want no value display
 

	
 
  getDisplayValue: () ->
 
    return '' if @config.emptyBox
 
    d3.format(".4g")(@_getValue())
 

	
 
  getCenter: () -> # vec2 of pixels
 
    @getTarget().add(@config.getSuggestedTargetOffset())
 

	
 
  getTarget: () -> # vec2 of pixels
 
    @config.getTarget()
 
            
 
  subscribe: (onChange) ->
 
    # change could be displayValue or center or target. This likely
 
    # calls onChange right away if there's any data yet.
 
    setInterval((() => onChange()), 100)
 
    throw new Error('not implemented')
 

	
 
  startDrag: () ->
 
    # todo
 
    @dragStartValue = @_getValue()
 
    # override
 

	
 
  continueDrag: (pos) ->
 
    # pos is vec2 of pixels relative to the drag start
 

	
 
    # override
 
    
 
    # todo
 
    newValue = @dragStartValue + pos.e(0) * .1
 
    graph.patchObject(@_subj, @_pred, graph.Literal(newValue), @_ctx)
 

	
 
  endDrag: () ->
 
    0
 
    # override
 

	
 
  _editorCoordinates: () -> # vec2 of mouse relative to <l9-t-editor>
 
    ev = d3.event.sourceEvent
 

	
 
    if ev.target.tagName == "LIGHT9-TIMELINE-EDITOR"
 
      rootElem = ev.target
 
    else
 
      rootElem = ev.target.closest('light9-timeline-editor')
 

	
 
    if ev.touches?.length
 
      ev = ev.touches[0]
 
      
 
    # storing root on the object to remember it across calls in case
 
    # you drag outside the editor.
 
    @root = rootElem.getBoundingClientRect() if rootElem
 
    offsetParentPos = $V([ev.pageX - @root.left, ev.pageY - @root.top])
 

	
 
    setMouse(offsetParentPos) # for debugging
 
    return offsetParentPos 
 

	
 
class window.AdjustableFloatObservable extends Adjustable
 
  constructor: (@config) ->
 
    # config also has:
 
    #   observable -> ko.observable we will read and write
 
@@ -94,25 +91,27 @@ class window.AdjustableFloatObject exten
 
    #   pred
 
    #   ctx
 
    #   getTargetTransform(value) -> getTarget result for value
 
    #   getValueForPos
 

	
 
    super(@config)
 
    
 
  _getValue: () ->
 
    # this is a big speedup- callers use _getValue about 4x as much as
 
    # the graph changes and graph.floatValue is slow
 
    @_currentValue
 

	
 
  getTarget: () ->
 
    @config.getTargetTransform(@_getValue())
 
    
 
  subscribe: (onChange) ->
 
    @config.graph.subscribe @config.subj, @config.pred, null, (patch) =>
 
      @_currentValue = @config.graph.floatValue(@config.subj, @config.pred)
 
      onChange()
 
    
 
  continueDrag: (pos) ->
 
    # pos is vec2 of pixels relative to the drag start
 
    
 
    newValue = @config.getValueForPos(@_editorCoordinates())
 
    @config.graph.patchObject(@config.subj, @config.pred, @config.graph.Literal(newValue), @_ctx)
 
    @config.graph.patchObject(@config.subj, @config.pred,
 
                              @config.graph.LiteralRoundedFloat(newValue),
 
                              @config.ctx)
light9/web/graph.coffee
Show inline comments
 
log = console.log
 

	
 
# Patch is {addQuads: <quads>, delQuads: <quads>}
 
# <quads> is [{subject: s, ...}, ...]
 

	
 
patchSizeSummary = (patch) ->
 
  '-' + patch.delQuads.length + ' +' + patch.addQuads.length
 

	
 
# partial port of autodepgraphapi.py
 
class GraphWatchers
 
  constructor: ->
 
    @handlersSp = {} # {s: {p: [handlers]}}
 
  subscribe: (s, p, o, onChange) -> # return subscription handle
 
    if o? then throw Error('not implemented')
 
    if not @handlersSp[s]
 
      @handlersSp[s] = {}
 
    if not @handlersSp[s][p]
 
      @handlersSp[s][p] = []
 
    @handlersSp[s][p].push(onChange)
 
    
 
  unsubscribe: (subscription) ->
 
    throw Error('not implemented')
 

	
 
  matchingHandlers: (quad) ->
 
    matches = []
 
    for subjDict in [@handlersSp[quad.subject] || {}, @handlersSp[null] || {}]
 
      for subjPredMatches in [subjDict[quad.predicate] || [], subjDict[null] || []]
 
        matches = matches.concat(subjPredMatches)
 
    return matches
 
    
 
  graphChanged: (patch) ->
 
    for quad in patch.delQuads
 
      for cb in @matchingHandlers(quad)
 
        # currently calls multiple times, which is ok, but we might
 
        # group things into fewer patches
 
        cb({delQuads: [quad], addQuads: []})
 
    for quad in patch.addQuads
 
      for cb in @matchingHandlers(quad)
 
        cb({delQuads: [], addQuads: [quad]})
 

	
 

	
 
jsonPatch = (jsPatch, cb) ->
 
  out = {patch: {adds: '', deletes: ''}}
 

	
 
  writeDels = (cb) ->
 
    writer = N3.Writer({ format: 'N-Quads' })
 
    writer.addTriples(jsPatch.delQuads)
 
    writer.end((err, result) ->
 
      out.patch.deletes = result
 
      cb())
 

	
 
  writeAdds = (cb) ->
 
    writer = N3.Writer({ format: 'N-Quads' })
 
    writer.addTriples(jsPatch.addQuads)
 
    writer.end((err, result) ->
 
      out.patch.adds = result
 
      cb())
 
    
 
  async.parallel([writeDels, writeAdds], (err) ->
 
      cb(JSON.stringify(out))
 
    )
 

	
 
class window.SyncedGraph
 
  # Note that applyPatch is the only method to write to the graph, so
 
  # it can fire subscriptions.
 

	
 
  constructor: (@patchSenderUrl, prefixes) ->
 
    @graph = N3.Store()
 
    @_addPrefixes(prefixes)
 
    @_watchers = new GraphWatchers()
 
    @patchesToSend = []
 
    @newConnection()
 

	
 
  newConnection: ->
 
    fullUrl = 'ws://' + window.location.host + @patchSenderUrl
 
    @ws = new WebSocket(fullUrl)
 

	
 
    @ws.onopen = =>
 
      log('connected to', fullUrl)
 

	
 
    @ws.onerror = (e) =>
 
      log('ws error ' + e)
 

	
 
    @ws.onclose = =>
 
      log('ws close')
 

	
 
    @ws.onmessage = (evt) =>
 
      @onMessage(JSON.parse(evt.data))
 

	
 
  onMessage: (msg) ->
 
    log('from rdfdb: ', msg)
 
    
 
    patch = {delQuads: [], addQuads: []}
 

	
 
    parseAdds = (cb) =>
 
      parser = N3.Parser()
 
      parser.parse msg.patch.adds, (error, quad, prefixes) =>
 
                    if (quad)
 
                      patch.addQuads.push(quad)
 
                    else
 
                      cb()
 
    parseDels = (cb) =>
 
      parser = N3.Parser()
 
      parser.parse msg.patch.deletes, (error, quad, prefixes) =>
 
                    if (quad)
 
                      patch.delQuads.push(quad)
 
                    else
 
                      cb()
 
      
 
    async.parallel([parseAdds, parseDels], ((err) => @applyPatch(patch)))
 

	
 
  _continueSending: ->
 
    if @ws.readyState != @ws.OPEN
 
      setTimeout(@_continueSending.bind(@), 500)
 
      return
 

	
 
    # we could call this less often and coalesce patches together to optimize
 
    # the dragging cases.
 

	
 
    sendOne = (patch, cb) =>
 
        jsonPatch(patch, (json) =>
 
          log('send patch to server, ' + json.length + ' bytes')
 
          @ws.send(json)
 
          cb(null)
 
      )
 

	
 
    async.eachSeries(@patchesToSend, sendOne, () =>
 
        @patchesToSend = []
 
      )
 
    
 
      
 
  _addPrefixes: (prefixes) ->
 
    @graph.addPrefixes(prefixes)
 
        
 
  Uri: (curie) ->
 
    N3.Util.expandPrefixedName(curie, @graph._prefixes)
 

	
 
  Literal: (jsValue) ->
 
    N3.Util.createLiteral(jsValue)
 

	
 
  LiteralRoundedFloat: (f) ->
 
    N3.Util.createLiteral(d3.format(".3f")(f),
 
                          "http://www.w3.org/2001/XMLSchema#decimal")
 

	
 
  toJs: (literal) ->
 
    # incomplete
 
    parseFloat(N3.Util.getLiteralValue(literal))
 

	
 
  loadTrig: (trig, cb) -> # for debugging
 
    patch = {delQuads: [], addQuads: []}
 
    parser = N3.Parser()
 
    parser.parse trig, (error, quad, prefixes) =>
 
                  if (quad)
 
                    patch.addQuads.push(quad)
 
                  else
 
                    @applyPatch(patch)
 
                    @_addPrefixes(prefixes)
 
                    cb() if cb
 
                    
 
  quads: () -> # for debugging
 
    [q.subject, q.predicate, q.object, q.graph] for q in @graph.find()
 

	
 
  applyAndSendPatch: (patch, cb) ->
 
    @applyPatch(patch)
 
    console.log('patch to server:')
 
    console.log('  delete:', JSON.stringify(patch.delQuads))
 
    console.log('  add:', JSON.stringify(patch.addQuads))
 
    # post to server
 
    console.log('queue patch to server ', patchSizeSummary(patch))
 
    @patchesToSend.push(patch)
 
    @_continueSending()
 

	
 
  applyPatch: (patch) ->
 
    # In most cases you want applyAndSendPatch.
 
    # 
 
    # This is the only method that writes to the graph!
 
    for quad in patch.delQuads
 
      @graph.removeTriple(quad)
 
    for quad in patch.addQuads
 
      @graph.addTriple(quad)
 
    log('applied patch -' + patch.delQuads.length + ' +' + patch.addQuads.length)
 
    log('applied patch locally', patchSizeSummary(patch))
 
    @_watchers.graphChanged(patch)
 

	
 
  getObjectPatch: (s, p, newObject, g) ->
 
    # send a patch which removes existing values for (s,p,*,c) and
 
    # adds (s,p,newObject,c). Values in other graphs are not affected.
 
    existing = @graph.findByIRI(s, p, null, g)
 
    return {
 
      delQuads: existing,
 
      addQuads: [{subject: s, predicate: p, object: newObject, graph: g}]
 
    }
 

	
 
  patchObject: (s, p, newObject, g) ->
 
    @applyAndSendPatch(@getObjectPatch(s, p, newObject, g))
 
  
 

	
 
  subscribe: (s, p, o, onChange) -> # return subscription handle
 
    # onChange is called with a patch that's limited to the quads
 
    # that match your request.
 
    # We call you immediately on existing triples.
 
    @_watchers.subscribe(s, p, o, onChange)
 
    immediatePatch = {delQuads: [], addQuads: @graph.findByIRI(s, p, o)}
 
    if immediatePatch.addQuads.length
 
      onChange(immediatePatch)
 

	
0 comments (0 inline, 0 general)