diff --git a/bin/collector b/bin/collector --- a/bin/collector +++ b/bin/collector @@ -48,15 +48,18 @@ def startZmq(port, collector): addr = 'tcp://*:%s' % port log.info('creating zmq endpoint at %r', addr) e = ZmqEndpoint('bind', addr) - s = ZmqPullConnection(zf, e) - def onPull(message): - with stats.setAttr.time(): - # todo: new compressed protocol where you send all URIs up - # front and then use small ints to refer to devices and - # attributes in subsequent requests. - client, clientSession, settings, sendTime = parseJsonMessage(message[0]) - collector.setAttrs(client, clientSession, settings, sendTime) - s.onPull = onPull + class Pull(ZmqPullConnection): + highWaterMark = 3 + def onPull(self, message): + with stats.setAttr.time(): + # todo: new compressed protocol where you send all URIs up + # front and then use small ints to refer to devices and + # attributes in subsequent requests. + client, clientSession, settings, sendTime = parseJsonMessage(message[0]) + collector.setAttrs(client, clientSession, settings, sendTime) + + s = Pull(zf, e) + class WebListeners(object): def __init__(self): @@ -76,7 +79,11 @@ class WebListeners(object): """called often- don't be slow""" self.pendingMessageForDev[dev] = (attrs, outputMap) - self._flush() + try: + self._flush() + except Exception: + traceback.print_exc() + raise def _flush(self): now = time.time() diff --git a/light9/collector/collector.py b/light9/collector/collector.py --- a/light9/collector/collector.py +++ b/light9/collector/collector.py @@ -164,9 +164,12 @@ class Collector(Generic[ClientType, Clie except KeyError: log.warn("request for output to unconfigured device %s" % d) continue - outputAttrs[d] = toOutputAttrs(devType, deviceAttrs.get(d, {})) - if self.listeners: - self.listeners.outputAttrsSet(d, outputAttrs[d], self.outputMap) + try: + outputAttrs[d] = toOutputAttrs(devType, deviceAttrs.get(d, {})) + if self.listeners: + self.listeners.outputAttrsSet(d, outputAttrs[d], self.outputMap) + except Exception as e: + log.error('failing toOutputAttrs on %s: %r', d, e) pendingOut = {} # output : values for out in self.outputs: diff --git a/light9/dmxclient.py b/light9/dmxclient.py --- a/light9/dmxclient.py +++ b/light9/dmxclient.py @@ -21,7 +21,9 @@ class TwistedZmqClient(object): def __init__(self, service): zf = ZmqFactory() e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port)) - self.conn = ZmqPushConnection(zf, e) + class Push(ZmqPushConnection): + highWaterMark = 3 + self.conn = Push(zf, e) def send(self, clientid, levellist): self.conn.push(json.dumps({'clientid': clientid, 'levellist': levellist}))