Changeset - 778c67ab70c9
[Not reviewed]
default
0 3 0
drewp@bigasterisk.com - 7 years ago 2018-06-08 10:41:36
drewp@bigasterisk.com
set zmq highWaterMark to dump stale messages, especially those sent when collector isn't running
Ignore-this: 83b2e509791edf2f738b4034339faece
3 files changed with 26 insertions and 14 deletions:
0 comments (0 inline, 0 general)
bin/collector
Show inline comments
 
@@ -48,15 +48,18 @@ def startZmq(port, collector):
 
    addr = 'tcp://*:%s' % port
 
    log.info('creating zmq endpoint at %r', addr)
 
    e = ZmqEndpoint('bind', addr)
 
    s = ZmqPullConnection(zf, e)
 
    def onPull(message):
 
        with stats.setAttr.time():
 
            # todo: new compressed protocol where you send all URIs up
 
            # front and then use small ints to refer to devices and
 
            # attributes in subsequent requests.
 
            client, clientSession, settings, sendTime = parseJsonMessage(message[0])
 
            collector.setAttrs(client, clientSession, settings, sendTime)
 
    s.onPull = onPull
 
    class Pull(ZmqPullConnection):
 
        highWaterMark = 3
 
        def onPull(self, message):
 
            with stats.setAttr.time():
 
                # todo: new compressed protocol where you send all URIs up
 
                # front and then use small ints to refer to devices and
 
                # attributes in subsequent requests.
 
                client, clientSession, settings, sendTime = parseJsonMessage(message[0])
 
                collector.setAttrs(client, clientSession, settings, sendTime)
 
        
 
    s = Pull(zf, e)
 

	
 

	
 
class WebListeners(object):
 
    def __init__(self):
 
@@ -76,7 +79,11 @@ class WebListeners(object):
 
        """called often- don't be slow"""
 

	
 
        self.pendingMessageForDev[dev] = (attrs, outputMap)
 
        self._flush()
 
        try:
 
            self._flush()
 
        except Exception:
 
            traceback.print_exc()
 
            raise
 

	
 
    def _flush(self):
 
        now = time.time()
light9/collector/collector.py
Show inline comments
 
@@ -164,9 +164,12 @@ class Collector(Generic[ClientType, Clie
 
            except KeyError:
 
                log.warn("request for output to unconfigured device %s" % d)
 
                continue
 
            outputAttrs[d] = toOutputAttrs(devType, deviceAttrs.get(d, {}))
 
            if self.listeners:
 
                self.listeners.outputAttrsSet(d, outputAttrs[d], self.outputMap)
 
            try:
 
                outputAttrs[d] = toOutputAttrs(devType, deviceAttrs.get(d, {}))
 
                if self.listeners:
 
                    self.listeners.outputAttrsSet(d, outputAttrs[d], self.outputMap)
 
            except Exception as e:
 
                log.error('failing toOutputAttrs on %s: %r', d, e)
 
        
 
        pendingOut = {} # output : values
 
        for out in self.outputs:
light9/dmxclient.py
Show inline comments
 
@@ -21,7 +21,9 @@ class TwistedZmqClient(object):
 
    def __init__(self, service):
 
        zf = ZmqFactory()
 
        e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port))
 
        self.conn = ZmqPushConnection(zf, e)
 
        class Push(ZmqPushConnection):
 
            highWaterMark = 3
 
        self.conn = Push(zf, e)
 
        
 
    def send(self, clientid, levellist):
 
        self.conn.push(json.dumps({'clientid': clientid, 'levellist': levellist}))
0 comments (0 inline, 0 general)