changeset 1436:d0d5900a8031

collector run the blocking dmx output calls in another thread, so they don't add delay to our http server Ignore-this: b5385085b654e1ac0b8018e6813f36e
author drewp@bigasterisk.com
date Sat, 11 Jun 2016 20:30:34 +0000
parents da8f36f809f1
children d149a2c2236c
files light9/collector/collector.py light9/collector/output.py
diffstat 2 files changed, 39 insertions(+), 14 deletions(-) [+]
line wrap: on
line diff
--- a/light9/collector/collector.py	Sat Jun 11 17:18:05 2016 +0000
+++ b/light9/collector/collector.py	Sat Jun 11 20:30:34 2016 +0000
@@ -109,9 +109,11 @@
             for outputAttr, value in attrs.iteritems():
                 self.setAttr(device, outputAttr, value, pendingOut)
 
-        print "%.1fms for client math" % (1000 * (time.time() - now))
+        dt1 = 1000 * (time.time() - now)
         self.flush(pendingOut)
-        print "%.1fms including flush" % (1000 * (time.time() - now))
+        dt2 = 1000 * (time.time() - now)
+        if dt1 > 10:
+            print "slow setAttrs: %.1fms -> flush -> %.1fms" % (dt1, dt2)
 
     def setAttr(self, device, outputAttr, value, pendingOut):
         output, index = self.outputMap[(device, outputAttr)]
--- a/light9/collector/output.py	Sat Jun 11 17:18:05 2016 +0000
+++ b/light9/collector/output.py	Sat Jun 11 20:30:34 2016 +0000
@@ -4,7 +4,7 @@
 import time
 import usb.core
 import logging
-from twisted.internet import task
+from twisted.internet import task, threads, reactor
 from greplin import scales
 log = logging.getLogger('output')
 
@@ -54,6 +54,20 @@
     def flush(self):
         pass
 
+    def _loop(self):
+        start = time.time()
+        sendingBuffer = self.currentBuffer
+
+        def done(worked):
+            if not worked:
+                self.countError()
+            else:
+                self.lastSentBuffer = sendingBuffer
+            reactor.callLater(max(0, start + 0.050 - time.time()),
+                              self._loop)
+
+        d = threads.deferToThread(self.sendDmx, sendingBuffer)
+        d.addCallback(done)
 
 class EnttecDmx(DmxOutput):
     stats = scales.collection('/output/enttecDmx',
@@ -68,7 +82,7 @@
         self.dev = Dmx(devicePath)
         self.currentBuffer = ''
         self.lastLog = 0
-        task.LoopingCall(self._loop).start(0.050)
+        self._loop()
 
     @stats.update.time()
     def update(self, values):
@@ -82,9 +96,13 @@
         self.currentBuffer = '\x00' + ''.join(map(chr, values)) + "\x00"
 
     @stats.write.time()
-    def _loop(self):
+    def sendDmx(self, buf):
         self.dev.write(self.currentBuffer)
 
+    def countError(self):
+        pass
+
+                                  
 class Udmx(DmxOutput):
     stats = scales.collection('/output/udmx',
                               scales.PmfStat('update'),
@@ -105,7 +123,8 @@
         #   2. Retries if there are usb errors.
         # Copying the LoopingCall logic accomplishes those with a
         # little wasted time if there are no updates.
-        task.LoopingCall(self._loop).start(0.050)
+        #task.LoopingCall(self._loop).start(0.050)
+        self._loop()
 
     @stats.update.time()
     def update(self, values):
@@ -115,15 +134,19 @@
             self.lastLog = now
 
         self.currentBuffer = ''.join(map(chr, values))
-    
-    def _loop(self):
-        #if self.lastSentBuffer == self.currentBuffer:
-        #    return
+
+    def sendDmx(self, buf):
         with Udmx.stats.write.time():
-            # frequently errors with usb.core.USBError
             try:
-                self.dev.SendDMX(self.currentBuffer)
-                self.lastSentBuffer = self.currentBuffer
+                self.dev.SendDMX(buf)
+                return True
             except usb.core.USBError:
-                Udmx.stats.usbErrors += 1
+                # not in main thread
+                return False
 
+    def countError(self):
+        # in main thread
+        Udmx.stats.usbErrors += 1
+                
+                                  
+