diff --git a/bin/collector b/bin/collector --- a/bin/collector +++ b/bin/collector @@ -48,15 +48,18 @@ def startZmq(port, collector): addr = 'tcp://*:%s' % port log.info('creating zmq endpoint at %r', addr) e = ZmqEndpoint('bind', addr) - s = ZmqPullConnection(zf, e) - def onPull(message): - with stats.setAttr.time(): - # todo: new compressed protocol where you send all URIs up - # front and then use small ints to refer to devices and - # attributes in subsequent requests. - client, clientSession, settings, sendTime = parseJsonMessage(message[0]) - collector.setAttrs(client, clientSession, settings, sendTime) - s.onPull = onPull + class Pull(ZmqPullConnection): + highWaterMark = 3 + def onPull(self, message): + with stats.setAttr.time(): + # todo: new compressed protocol where you send all URIs up + # front and then use small ints to refer to devices and + # attributes in subsequent requests. + client, clientSession, settings, sendTime = parseJsonMessage(message[0]) + collector.setAttrs(client, clientSession, settings, sendTime) + + s = Pull(zf, e) + class WebListeners(object): def __init__(self): @@ -76,7 +79,11 @@ class WebListeners(object): """called often- don't be slow""" self.pendingMessageForDev[dev] = (attrs, outputMap) - self._flush() + try: + self._flush() + except Exception: + traceback.print_exc() + raise def _flush(self): now = time.time()