Mercurial > code > home > repos > light9
changeset 1809:778c67ab70c9
set zmq highWaterMark to dump stale messages, especially those sent when collector isn't running
Ignore-this: 83b2e509791edf2f738b4034339faece
author | drewp@bigasterisk.com |
---|---|
date | Fri, 08 Jun 2018 10:41:36 +0000 |
parents | 86b53e1e7e52 |
children | ad3775ffb622 |
files | bin/collector light9/collector/collector.py light9/dmxclient.py |
diffstat | 3 files changed, 26 insertions(+), 14 deletions(-) [+] |
line wrap: on
line diff
--- a/bin/collector Fri Jun 08 10:11:13 2018 +0000 +++ b/bin/collector Fri Jun 08 10:41:36 2018 +0000 @@ -48,15 +48,18 @@ addr = 'tcp://*:%s' % port log.info('creating zmq endpoint at %r', addr) e = ZmqEndpoint('bind', addr) - s = ZmqPullConnection(zf, e) - def onPull(message): - with stats.setAttr.time(): - # todo: new compressed protocol where you send all URIs up - # front and then use small ints to refer to devices and - # attributes in subsequent requests. - client, clientSession, settings, sendTime = parseJsonMessage(message[0]) - collector.setAttrs(client, clientSession, settings, sendTime) - s.onPull = onPull + class Pull(ZmqPullConnection): + highWaterMark = 3 + def onPull(self, message): + with stats.setAttr.time(): + # todo: new compressed protocol where you send all URIs up + # front and then use small ints to refer to devices and + # attributes in subsequent requests. + client, clientSession, settings, sendTime = parseJsonMessage(message[0]) + collector.setAttrs(client, clientSession, settings, sendTime) + + s = Pull(zf, e) + class WebListeners(object): def __init__(self): @@ -76,7 +79,11 @@ """called often- don't be slow""" self.pendingMessageForDev[dev] = (attrs, outputMap) - self._flush() + try: + self._flush() + except Exception: + traceback.print_exc() + raise def _flush(self): now = time.time()
--- a/light9/collector/collector.py Fri Jun 08 10:11:13 2018 +0000 +++ b/light9/collector/collector.py Fri Jun 08 10:41:36 2018 +0000 @@ -164,9 +164,12 @@ except KeyError: log.warn("request for output to unconfigured device %s" % d) continue - outputAttrs[d] = toOutputAttrs(devType, deviceAttrs.get(d, {})) - if self.listeners: - self.listeners.outputAttrsSet(d, outputAttrs[d], self.outputMap) + try: + outputAttrs[d] = toOutputAttrs(devType, deviceAttrs.get(d, {})) + if self.listeners: + self.listeners.outputAttrsSet(d, outputAttrs[d], self.outputMap) + except Exception as e: + log.error('failing toOutputAttrs on %s: %r', d, e) pendingOut = {} # output : values for out in self.outputs:
--- a/light9/dmxclient.py Fri Jun 08 10:11:13 2018 +0000 +++ b/light9/dmxclient.py Fri Jun 08 10:41:36 2018 +0000 @@ -21,7 +21,9 @@ def __init__(self, service): zf = ZmqFactory() e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port)) - self.conn = ZmqPushConnection(zf, e) + class Push(ZmqPushConnection): + highWaterMark = 3 + self.conn = Push(zf, e) def send(self, clientid, levellist): self.conn.push(json.dumps({'clientid': clientid, 'levellist': levellist}))