Changeset - ce97f298bfb8
[Not reviewed]
default
0 3 0
drewp@bigasterisk.com - 9 years ago 2016-06-13 20:02:49
drewp@bigasterisk.com
restore zmq transport to collector
Ignore-this: 2d834c3bca0f7d594aee7469660e73f5
3 files changed with 57 insertions and 13 deletions:
0 comments (0 inline, 0 general)
bin/collector
Show inline comments
 
@@ -20,7 +20,7 @@ from light9.rdfdb.syncedgraph import Syn
 
from light9.rdfdb import clientsession
 

	
 
def parseJsonMessage(msg):
 
    body = json.load(msg)
 
    body = json.loads(msg)
 
    settings = []
 
    for device, attr, value in body['settings']:
 
        settings.append((URIRef(device), URIRef(attr), Literal(value)))
 
@@ -36,7 +36,7 @@ class WebServer(object):
 
    @app.route('/attrs', methods=['PUT'])
 
    def putAttrs(self, request):
 
        with WebServer.stats.setAttr.time():
 
            client, clientSession, settings, sendTime = parseJsonMessage(request.content)
 
            client, clientSession, settings, sendTime = parseJsonMessage(request.content.read())
 
            self.collector.setAttrs(client, clientSession, settings, sendTime)
 
            request.setResponseCode(202)
 

	
 
@@ -49,15 +49,17 @@ def startZmq(port, collector):
 
                              scales.PmfStat('setAttr'))
 
    
 
    zf = ZmqFactory()
 
    e = ZmqEndpoint('bind', 'tcp://*:%s' % port)
 
    addr = 'tcp://*:%s' % port
 
    log.info('creating zmq endpoint at %r', addr)
 
    e = ZmqEndpoint('bind', addr)
 
    s = ZmqPullConnection(zf, e)
 
    def onPull(message):
 
        with stats.setAttrZmq.time():
 
        with stats.setAttr.time():
 
            # todo: new compressed protocol where you send all URIs up
 
            # front and then use small ints to refer to devices and
 
            # attributes in subsequent requests.
 
            message[0]
 
            collector.setAttrs()
 
            client, clientSession, settings, sendTime = parseJsonMessage(message[0])
 
            collector.setAttrs(client, clientSession, settings, sendTime)
 
    s.onPull = onPull
 

	
 
def launch(graph, doLoadTest=False):
light9/collector/collector.py
Show inline comments
 
@@ -82,7 +82,10 @@ class Collector(object):
 
        Call with settings=[] to ping us that your session isn't dead.
 
        """
 
        now = time.time()
 
        print now - sendTime
 
        requestLag = now - sendTime
 
        if requestLag > .1:
 
            log.warn('collector.setAttrs from %s is running %.1fms after the request was made',
 
                     client, requestLag * 1000)
 

	
 
        self._forgetStaleClients(now)
 

	
light9/effect/sequencer.py
Show inline comments
 
@@ -4,7 +4,7 @@ copies from effectloop.py, which this sh
 

	
 
from __future__ import division
 
from rdflib import URIRef, Literal
 
from twisted.internet import reactor
 
from twisted.internet import reactor, defer
 
from webcolors import rgb_to_hex
 
import json, logging, bisect
 
import treq
 
@@ -18,20 +18,59 @@ from light9.namespaces import L9, RDF
 
from light9.vidref.musictime import MusicTime
 
from light9.effect import effecteval
 
from greplin import scales
 
from txzmq import ZmqEndpoint, ZmqFactory, ZmqPushConnection
 

	
 
log = logging.getLogger('sequencer')
 
stats = scales.collection('/sequencer/',
 
                                       scales.PmfStat('update'),
 
                          scales.DoubleStat('recentFps'),
 
)
 

	
 
                                       )
 
def sendToCollector(client, session, settings):
 
    return treq.put(networking.collector.path('attrs'),
 
                    data=json.dumps({'settings': settings,
 
_zmqClient=None
 
class TwistedZmqClient(object):
 
    def __init__(self, service):
 
        zf = ZmqFactory()
 
        e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port))
 
        self.conn = ZmqPushConnection(zf, e)
 
        
 
    def send(self, msg):
 
        self.conn.push(msg)
 

	
 

	
 
def toCollectorJson(client, session, settings):
 
    return json.dumps({'settings': settings,
 
                                     'client': client,
 
                                     'clientSession': session,
 
                                     'sendTime': time.time(),
 
                                 }))
 
                  })
 
        
 
def sendToCollectorZmq(msg):
 
    global _zmqClient
 
    if _zmqClient is None:
 
        _zmqClient = TwistedZmqClient(networking.collectorZmq)
 
    _zmqClient.send(msg)
 
    return defer.succeed(0)
 
        
 
def sendToCollector(client, session, settings, useZmq=True):
 
    """deferred to the time in seconds it took to get a response from collector"""
 
    sendTime = time.time()
 
    msg = toCollectorJson(client, session, settings)
 

	
 
    if useZmq:
 
        d = sendToCollectorZmq(msg)
 
    else:
 
        d = treq.put(networking.collector.path('attrs'), data=msg)
 
    
 
    def onDone(result):
 
        dt = time.time() - sendTime
 
        if dt > .1:
 
            log.warn('sendToCollector request took %.1fms', dt * 1000)
 
        return dt
 
    d.addCallback(onDone)
 
    def onErr(err):
 
        log.warn('sendToCollector failed: %r', err)
 
    d.addErrback(onErr)
 
    return d
 

	
 

	
 
class Note(object):
0 comments (0 inline, 0 general)