Changeset - d0d5900a8031
[Not reviewed]
default
0 2 0
drewp@bigasterisk.com - 9 years ago 2016-06-11 20:30:34
drewp@bigasterisk.com
collector run the blocking dmx output calls in another thread, so they don't add delay to our http server
Ignore-this: b5385085b654e1ac0b8018e6813f36e
2 files changed with 37 insertions and 12 deletions:
0 comments (0 inline, 0 general)
light9/collector/collector.py
Show inline comments
 
@@ -106,15 +106,17 @@ class Collector(object):
 
        
 
        pendingOut = {} # output : values
 
        for device, attrs in outputAttrs.iteritems():
 
            for outputAttr, value in attrs.iteritems():
 
                self.setAttr(device, outputAttr, value, pendingOut)
 

	
 
        print "%.1fms for client math" % (1000 * (time.time() - now))
 
        dt1 = 1000 * (time.time() - now)
 
        self.flush(pendingOut)
 
        print "%.1fms including flush" % (1000 * (time.time() - now))
 
        dt2 = 1000 * (time.time() - now)
 
        if dt1 > 10:
 
            print "slow setAttrs: %.1fms -> flush -> %.1fms" % (dt1, dt2)
 

	
 
    def setAttr(self, device, outputAttr, value, pendingOut):
 
        output, index = self.outputMap[(device, outputAttr)]
 
        outList = pendingOut.setdefault(output, [])
 
        setListElem(outList, index, value, combine=max)
 

	
light9/collector/output.py
Show inline comments
 
from __future__ import division
 
from rdflib import URIRef
 
import sys
 
import time
 
import usb.core
 
import logging
 
from twisted.internet import task
 
from twisted.internet import task, threads, reactor
 
from greplin import scales
 
log = logging.getLogger('output')
 

	
 
def setListElem(outList, index, value, fill=0, combine=lambda old, new: new):
 
    if len(outList) < index:
 
        outList.extend([fill] * (index - len(outList)))
 
@@ -51,12 +51,26 @@ class DmxOutput(Output):
 
    def __init__(self, uri):
 
        self.uri = uri
 

	
 
    def flush(self):
 
        pass
 

	
 
    def _loop(self):
 
        start = time.time()
 
        sendingBuffer = self.currentBuffer
 

	
 
        def done(worked):
 
            if not worked:
 
                self.countError()
 
            else:
 
                self.lastSentBuffer = sendingBuffer
 
            reactor.callLater(max(0, start + 0.050 - time.time()),
 
                              self._loop)
 

	
 
        d = threads.deferToThread(self.sendDmx, sendingBuffer)
 
        d.addCallback(done)
 

	
 
class EnttecDmx(DmxOutput):
 
    stats = scales.collection('/output/enttecDmx',
 
                              scales.PmfStat('write'),
 
                              scales.PmfStat('update'))
 

	
 
@@ -65,13 +79,13 @@ class EnttecDmx(DmxOutput):
 

	
 
        sys.path.append("dmx_usb_module")
 
        from dmx import Dmx
 
        self.dev = Dmx(devicePath)
 
        self.currentBuffer = ''
 
        self.lastLog = 0
 
        task.LoopingCall(self._loop).start(0.050)
 
        self._loop()
 

	
 
    @stats.update.time()
 
    def update(self, values):
 
        now = time.time()
 
        if now > self.lastLog + 1:
 
            log.info('enttec %s', ' '.join(map(str, values)))
 
@@ -79,15 +93,19 @@ class EnttecDmx(DmxOutput):
 

	
 
        # I was outputting on 76 and it was turning on the light at
 
        # dmx75. So I added the 0 byte. No notes explaining the footer byte.
 
        self.currentBuffer = '\x00' + ''.join(map(chr, values)) + "\x00"
 

	
 
    @stats.write.time()
 
    def _loop(self):
 
    def sendDmx(self, buf):
 
        self.dev.write(self.currentBuffer)
 

	
 
    def countError(self):
 
        pass
 

	
 
                                  
 
class Udmx(DmxOutput):
 
    stats = scales.collection('/output/udmx',
 
                              scales.PmfStat('update'),
 
                              scales.PmfStat('write'),
 
                              scales.IntStat('usbErrors'))
 
    def __init__(self, uri):
 
@@ -102,28 +120,33 @@ class Udmx(DmxOutput):
 
        # Doesn't actually need to get called repeatedly, but we do
 
        # need these two things:
 
        #   1. A throttle so we don't lag behind sending old updates.
 
        #   2. Retries if there are usb errors.
 
        # Copying the LoopingCall logic accomplishes those with a
 
        # little wasted time if there are no updates.
 
        task.LoopingCall(self._loop).start(0.050)
 
        #task.LoopingCall(self._loop).start(0.050)
 
        self._loop()
 

	
 
    @stats.update.time()
 
    def update(self, values):
 
        now = time.time()
 
        if now > self.lastLog + 1:
 
            log.info('udmx %s', ' '.join(map(str, values)))
 
            self.lastLog = now
 

	
 
        self.currentBuffer = ''.join(map(chr, values))
 
    
 
    def _loop(self):
 
        #if self.lastSentBuffer == self.currentBuffer:
 
        #    return
 
    def sendDmx(self, buf):
 
        with Udmx.stats.write.time():
 
            # frequently errors with usb.core.USBError
 
            try:
 
                self.dev.SendDMX(self.currentBuffer)
 
                self.lastSentBuffer = self.currentBuffer
 
                self.dev.SendDMX(buf)
 
                return True
 
            except usb.core.USBError:
 
                # not in main thread
 
                return False
 

	
 
    def countError(self):
 
        # in main thread
 
                Udmx.stats.usbErrors += 1
 

	
 
                                  
 
        
0 comments (0 inline, 0 general)