changeset 1809:778c67ab70c9

set zmq highWaterMark to dump stale messages, especially those sent when collector isn't running Ignore-this: 83b2e509791edf2f738b4034339faece
author drewp@bigasterisk.com
date Fri, 08 Jun 2018 10:41:36 +0000
parents 86b53e1e7e52
children ad3775ffb622
files bin/collector light9/collector/collector.py light9/dmxclient.py
diffstat 3 files changed, 26 insertions(+), 14 deletions(-) [+]
line wrap: on
line diff
--- a/bin/collector	Fri Jun 08 10:11:13 2018 +0000
+++ b/bin/collector	Fri Jun 08 10:41:36 2018 +0000
@@ -48,15 +48,18 @@
     addr = 'tcp://*:%s' % port
     log.info('creating zmq endpoint at %r', addr)
     e = ZmqEndpoint('bind', addr)
-    s = ZmqPullConnection(zf, e)
-    def onPull(message):
-        with stats.setAttr.time():
-            # todo: new compressed protocol where you send all URIs up
-            # front and then use small ints to refer to devices and
-            # attributes in subsequent requests.
-            client, clientSession, settings, sendTime = parseJsonMessage(message[0])
-            collector.setAttrs(client, clientSession, settings, sendTime)
-    s.onPull = onPull
+    class Pull(ZmqPullConnection):
+        highWaterMark = 3
+        def onPull(self, message):
+            with stats.setAttr.time():
+                # todo: new compressed protocol where you send all URIs up
+                # front and then use small ints to refer to devices and
+                # attributes in subsequent requests.
+                client, clientSession, settings, sendTime = parseJsonMessage(message[0])
+                collector.setAttrs(client, clientSession, settings, sendTime)
+        
+    s = Pull(zf, e)
+
 
 class WebListeners(object):
     def __init__(self):
@@ -76,7 +79,11 @@
         """called often- don't be slow"""
 
         self.pendingMessageForDev[dev] = (attrs, outputMap)
-        self._flush()
+        try:
+            self._flush()
+        except Exception:
+            traceback.print_exc()
+            raise
 
     def _flush(self):
         now = time.time()
--- a/light9/collector/collector.py	Fri Jun 08 10:11:13 2018 +0000
+++ b/light9/collector/collector.py	Fri Jun 08 10:41:36 2018 +0000
@@ -164,9 +164,12 @@
             except KeyError:
                 log.warn("request for output to unconfigured device %s" % d)
                 continue
-            outputAttrs[d] = toOutputAttrs(devType, deviceAttrs.get(d, {}))
-            if self.listeners:
-                self.listeners.outputAttrsSet(d, outputAttrs[d], self.outputMap)
+            try:
+                outputAttrs[d] = toOutputAttrs(devType, deviceAttrs.get(d, {}))
+                if self.listeners:
+                    self.listeners.outputAttrsSet(d, outputAttrs[d], self.outputMap)
+            except Exception as e:
+                log.error('failing toOutputAttrs on %s: %r', d, e)
         
         pendingOut = {} # output : values
         for out in self.outputs:
--- a/light9/dmxclient.py	Fri Jun 08 10:11:13 2018 +0000
+++ b/light9/dmxclient.py	Fri Jun 08 10:41:36 2018 +0000
@@ -21,7 +21,9 @@
     def __init__(self, service):
         zf = ZmqFactory()
         e = ZmqEndpoint('connect', 'tcp://%s:%s' % (service.host, service.port))
-        self.conn = ZmqPushConnection(zf, e)
+        class Push(ZmqPushConnection):
+            highWaterMark = 3
+        self.conn = Push(zf, e)
         
     def send(self, clientid, levellist):
         self.conn.push(json.dumps({'clientid': clientid, 'levellist': levellist}))