Changeset - 778c67ab70c9
[Not reviewed]
default
0 3 0
drewp@bigasterisk.com - 7 years ago 2018-06-08 10:41:36
drewp@bigasterisk.com
set zmq highWaterMark to dump stale messages, especially those sent when collector isn't running
Ignore-this: 83b2e509791edf2f738b4034339faece
3 files changed with 26 insertions and 14 deletions:
0 comments (0 inline, 0 general)
bin/collector
Show inline comments
 
@@ -45,21 +45,24 @@ def startZmq(port, collector):
 
                              scales.PmfStat('setAttr'))
 
    
 
    zf = ZmqFactory()
 
    addr = 'tcp://*:%s' % port
 
    log.info('creating zmq endpoint at %r', addr)
 
    e = ZmqEndpoint('bind', addr)
 
    s = ZmqPullConnection(zf, e)
 
    def onPull(message):
 
        with stats.setAttr.time():
 
            # todo: new compressed protocol where you send all URIs up
 
            # front and then use small ints to refer to devices and
 
            # attributes in subsequent requests.
 
            client, clientSession, settings, sendTime = parseJsonMessage(message[0])
 
            collector.setAttrs(client, clientSession, settings, sendTime)
 
    s.onPull = onPull
 
    class Pull(ZmqPullConnection):
 
        highWaterMark = 3
 
        def onPull(self, message):
 
            with stats.setAttr.time():
 
                # todo: new compressed protocol where you send all URIs up
 
                # front and then use small ints to refer to devices and
 
                # attributes in subsequent requests.
 
                client, clientSession, settings, sendTime = parseJsonMessage(message[0])
 
                collector.setAttrs(client, clientSession, settings, sendTime)
 
        
 
    s = Pull(zf, e)
 

	
 

	
 
class WebListeners(object):
 
    def __init__(self):
 
        self.clients = []
 
        self.pendingMessageForDev = {} # dev: (attrs, outputmap)
 
        self.lastFlush = 0
 
@@ -73,13 +76,17 @@ class WebListeners(object):
 
        log.info('delClient %s, %s left', client, len(self.clients))
 
        
 
    def outputAttrsSet(self, dev, attrs, outputMap):
 
        """called often- don't be slow"""
 

	
 
        self.pendingMessageForDev[dev] = (attrs, outputMap)
 
        self._flush()
 
        try:
 
            self._flush()
 
        except Exception:
 
            traceback.print_exc()
 
            raise
 

	
 
    def _flush(self):
 
        now = time.time()
 
        if now < self.lastFlush + .05 or not self.clients:
 
            return
 
        self.lastFlush = now
light9/collector/collector.py
Show inline comments
 
@@ -161,15 +161,18 @@ class Collector(Generic[ClientType, Clie
 
        for d in self.allDevices:
 
            try:
 
                devType = self.deviceType[d]
 
            except KeyError:
 
                log.warn("request for output to unconfigured device %s" % d)
 
                continue
 
            outputAttrs[d] = toOutputAttrs(devType, deviceAttrs.get(d, {}))
 
            if self.listeners:
 
                self.listeners.outputAttrsSet(d, outputAttrs[d], self.outputMap)
 
            try:
 
                outputAttrs[d] = toOutputAttrs(devType, deviceAttrs.get(d, {}))
 
                if self.listeners:
 
                    self.listeners.outputAttrsSet(d, outputAttrs[d], self.outputMap)
 
            except Exception as e:
 
                log.error('failing toOutputAttrs on %s: %r', d, e)
 
        
 
        pendingOut = {} # output : values
 
        for out in self.outputs:
 
            pendingOut[out] = [0] * out.numChannels
 

	
 
        for device, attrs in outputAttrs.iteritems():
light9/dmxclient.py
Show inline comments
 
@@ -18,13 +18,15 @@ procname = procname.replace('.py', '')
 
_id = "%s-%s-%s" % (procname, socket.gethostname(), os.getpid())
 

	
 
class TwistedZmqClient(object):
 
    def __init__(self, service):
 
        zf = ZmqFactory()
 
        e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port))
 
        self.conn = ZmqPushConnection(zf, e)
 
        class Push(ZmqPushConnection):
 
            highWaterMark = 3
 
        self.conn = Push(zf, e)
 
        
 
    def send(self, clientid, levellist):
 
        self.conn.push(json.dumps({'clientid': clientid, 'levellist': levellist}))
 

	
 
def outputlevels(levellist,twisted=0,clientid=_id):
 
    """present a list of dmx channel levels, each scaled from
0 comments (0 inline, 0 general)