Mercurial > code > home > repos > light9
changeset 1436:d0d5900a8031
collector run the blocking dmx output calls in another thread, so they don't add delay to our http server
Ignore-this: b5385085b654e1ac0b8018e6813f36e
author | drewp@bigasterisk.com |
---|---|
date | Sat, 11 Jun 2016 20:30:34 +0000 |
parents | da8f36f809f1 |
children | d149a2c2236c |
files | light9/collector/collector.py light9/collector/output.py |
diffstat | 2 files changed, 39 insertions(+), 14 deletions(-) [+] |
line wrap: on
line diff
--- a/light9/collector/collector.py Sat Jun 11 17:18:05 2016 +0000 +++ b/light9/collector/collector.py Sat Jun 11 20:30:34 2016 +0000 @@ -109,9 +109,11 @@ for outputAttr, value in attrs.iteritems(): self.setAttr(device, outputAttr, value, pendingOut) - print "%.1fms for client math" % (1000 * (time.time() - now)) + dt1 = 1000 * (time.time() - now) self.flush(pendingOut) - print "%.1fms including flush" % (1000 * (time.time() - now)) + dt2 = 1000 * (time.time() - now) + if dt1 > 10: + print "slow setAttrs: %.1fms -> flush -> %.1fms" % (dt1, dt2) def setAttr(self, device, outputAttr, value, pendingOut): output, index = self.outputMap[(device, outputAttr)]
--- a/light9/collector/output.py Sat Jun 11 17:18:05 2016 +0000 +++ b/light9/collector/output.py Sat Jun 11 20:30:34 2016 +0000 @@ -4,7 +4,7 @@ import time import usb.core import logging -from twisted.internet import task +from twisted.internet import task, threads, reactor from greplin import scales log = logging.getLogger('output') @@ -54,6 +54,20 @@ def flush(self): pass + def _loop(self): + start = time.time() + sendingBuffer = self.currentBuffer + + def done(worked): + if not worked: + self.countError() + else: + self.lastSentBuffer = sendingBuffer + reactor.callLater(max(0, start + 0.050 - time.time()), + self._loop) + + d = threads.deferToThread(self.sendDmx, sendingBuffer) + d.addCallback(done) class EnttecDmx(DmxOutput): stats = scales.collection('/output/enttecDmx', @@ -68,7 +82,7 @@ self.dev = Dmx(devicePath) self.currentBuffer = '' self.lastLog = 0 - task.LoopingCall(self._loop).start(0.050) + self._loop() @stats.update.time() def update(self, values): @@ -82,9 +96,13 @@ self.currentBuffer = '\x00' + ''.join(map(chr, values)) + "\x00" @stats.write.time() - def _loop(self): + def sendDmx(self, buf): self.dev.write(self.currentBuffer) + def countError(self): + pass + + class Udmx(DmxOutput): stats = scales.collection('/output/udmx', scales.PmfStat('update'), @@ -105,7 +123,8 @@ # 2. Retries if there are usb errors. # Copying the LoopingCall logic accomplishes those with a # little wasted time if there are no updates. - task.LoopingCall(self._loop).start(0.050) + #task.LoopingCall(self._loop).start(0.050) + self._loop() @stats.update.time() def update(self, values): @@ -115,15 +134,19 @@ self.lastLog = now self.currentBuffer = ''.join(map(chr, values)) - - def _loop(self): - #if self.lastSentBuffer == self.currentBuffer: - # return + + def sendDmx(self, buf): with Udmx.stats.write.time(): - # frequently errors with usb.core.USBError try: - self.dev.SendDMX(self.currentBuffer) - self.lastSentBuffer = self.currentBuffer + self.dev.SendDMX(buf) + return True except usb.core.USBError: - Udmx.stats.usbErrors += 1 + # not in main thread + return False + def countError(self): + # in main thread + Udmx.stats.usbErrors += 1 + + +