diff --git a/light9/collector/collector.py b/light9/collector/collector.py --- a/light9/collector/collector.py +++ b/light9/collector/collector.py @@ -109,9 +109,11 @@ class Collector(object): for outputAttr, value in attrs.iteritems(): self.setAttr(device, outputAttr, value, pendingOut) - print "%.1fms for client math" % (1000 * (time.time() - now)) + dt1 = 1000 * (time.time() - now) self.flush(pendingOut) - print "%.1fms including flush" % (1000 * (time.time() - now)) + dt2 = 1000 * (time.time() - now) + if dt1 > 10: + print "slow setAttrs: %.1fms -> flush -> %.1fms" % (dt1, dt2) def setAttr(self, device, outputAttr, value, pendingOut): output, index = self.outputMap[(device, outputAttr)] diff --git a/light9/collector/output.py b/light9/collector/output.py --- a/light9/collector/output.py +++ b/light9/collector/output.py @@ -4,7 +4,7 @@ import sys import time import usb.core import logging -from twisted.internet import task +from twisted.internet import task, threads, reactor from greplin import scales log = logging.getLogger('output') @@ -54,6 +54,20 @@ class DmxOutput(Output): def flush(self): pass + def _loop(self): + start = time.time() + sendingBuffer = self.currentBuffer + + def done(worked): + if not worked: + self.countError() + else: + self.lastSentBuffer = sendingBuffer + reactor.callLater(max(0, start + 0.050 - time.time()), + self._loop) + + d = threads.deferToThread(self.sendDmx, sendingBuffer) + d.addCallback(done) class EnttecDmx(DmxOutput): stats = scales.collection('/output/enttecDmx', @@ -68,7 +82,7 @@ class EnttecDmx(DmxOutput): self.dev = Dmx(devicePath) self.currentBuffer = '' self.lastLog = 0 - task.LoopingCall(self._loop).start(0.050) + self._loop() @stats.update.time() def update(self, values): @@ -82,9 +96,13 @@ class EnttecDmx(DmxOutput): self.currentBuffer = '\x00' + ''.join(map(chr, values)) + "\x00" @stats.write.time() - def _loop(self): + def sendDmx(self, buf): self.dev.write(self.currentBuffer) + def countError(self): + pass + + class Udmx(DmxOutput): stats = scales.collection('/output/udmx', scales.PmfStat('update'), @@ -105,7 +123,8 @@ class Udmx(DmxOutput): # 2. Retries if there are usb errors. # Copying the LoopingCall logic accomplishes those with a # little wasted time if there are no updates. - task.LoopingCall(self._loop).start(0.050) + #task.LoopingCall(self._loop).start(0.050) + self._loop() @stats.update.time() def update(self, values): @@ -115,15 +134,19 @@ class Udmx(DmxOutput): self.lastLog = now self.currentBuffer = ''.join(map(chr, values)) - - def _loop(self): - #if self.lastSentBuffer == self.currentBuffer: - # return + + def sendDmx(self, buf): with Udmx.stats.write.time(): - # frequently errors with usb.core.USBError try: - self.dev.SendDMX(self.currentBuffer) - self.lastSentBuffer = self.currentBuffer + self.dev.SendDMX(buf) + return True except usb.core.USBError: - Udmx.stats.usbErrors += 1 + # not in main thread + return False + def countError(self): + # in main thread + Udmx.stats.usbErrors += 1 + + +