Changeset - 41f0fc1d51c7
[Not reviewed]
default
0 1 0
Drew Perttula - 10 years ago 2015-06-13 04:28:13
drewp@bigasterisk.com
refactor serial output
Ignore-this: 3e96d77b59879b2aecc1e6d562401da0
1 file changed with 31 insertions and 28 deletions:
0 comments (0 inline, 0 general)
light9/effecteval/effectloop.py
Show inline comments
 
@@ -172,11 +172,11 @@ class EffectLoop(object):
 
                ", ".join("%r: %.3g" % (str(k), v)
 
                          for k,v in out.get_levels().items()))
 

	
 
Z = numpy.zeros((50, 3), dtype=numpy.uint8)
 
Z = numpy.zeros((50, 3), dtype=numpy.float16)
 

	
 
class ControlBoard(object):
 
    def __init__(self, dev='/dev/serial/by-id/usb-FTDI_FT232R_USB_UART_A7027NYX-if00-port0'):
 
        self.board = serial.Serial(dev, baudrate=115200)
 
        self._dev = None#serial.Serial(dev, baudrate=115200)
 

	
 
    def _8bitMessage(self, floatArray):
 
        px255 = (numpy.clip(floatArray, 0, 1) * 255).astype(numpy.uint8)
 
@@ -190,7 +190,8 @@ class ControlBoard(object):
 
        command = {0: '\x00', 1: '\x01'}[which]
 
        if pixels.shape != (50, 3):
 
            raise ValueError("pixels was %s" % pixels.shape)
 
        self.board.write('\x60' + command + self._8bitMessage(pixels))
 
        self._dev.write('\x60' + command + self._8bitMessage(pixels))
 
        self._dev.flush()
 

	
 
    def setUv(self, which, level):
 
        """
 
@@ -198,7 +199,8 @@ class ControlBoard(object):
 
        level: 0 to 1
 
        """
 
        command = {0: '\x02', 1: '\x03'}[which]
 
        self.board.write('\x60' + command + chr(max(0, min(1, level)) * 255))
 
        self._dev.write('\x60' + command + chr(max(0, min(1, level)) * 255))
 
        self._dev.flush()
 

	
 
    def setRgb(self, color):
 
        """
 
@@ -206,48 +208,49 @@ class ControlBoard(object):
 
        """
 
        if color.shape != (1, 3):
 
            raise ValueError("color was %s" % color.shape)
 
        self.board.write('\x60\x04%s' + self._8bitMessage(color))
 
        self._dev.write('\x60\x04%s' + self._8bitMessage(color))
 
        self._dev.flush()
 

	
 
        
 
class LedLoop(EffectLoop):
 
    def initOutput(self):
 
        kw = dict(baudrate=115200)
 
        self.boards = serial.Serial('/dev/serial/by-id/usb-FTDI_FT232R_USB_UART_A7027NYX-if00-port0', **kw)
 
        self.lastSentBacklight = None
 
        self.board = ControlBoard()
 
        self.lastSent = {}
 
        
 
    def combineOutputs(self, outputs):
 
        combined = {'L': Z, 'R': Z, 'blacklight': 0}
 
        combined = {'L': Z, 'R': Z,
 
                    'blacklight0': 0, 'blacklight1': 0,
 
                    'rgb': numpy.zeros((1, 3), dtype=numpy.float16)}
 
        
 
        for out in outputs:
 
            if isinstance(out, Effects.Blacklight):
 
                combined['blacklight'] = max(combined['blacklight'], int(out * 255))
 
                key = 'blacklight%s' % out.which
 
                combined[key] = max(combined[key], out)
 
            elif isinstance(out, Effects.Strip):
 
                pixels = numpy.array(out.pixels, dtype=numpy.float16)
 
                px255 = (numpy.clip(pixels, 0, 1) * 255).astype(numpy.uint8)
 
                for w in out.which:
 
                    combined[w] = numpy.maximum(combined[w], px255)
 
                    combined[w] = numpy.maximum(combined[w], pixels)
 
                
 
        return combined
 

	
 
    @inlineCallbacks
 
    def sendOutput(self, combined):
 
        for which, px255 in combined.items():
 
            if which == 'blacklight':
 
                if px255 != self.lastSentBacklight:
 
                    b = min(255, max(0, px255))
 
                    yield succeed(self.serialWrite(self.boards['L'],
 
                                                   '\x60\x01' + chr(b)))
 
                    self.lastSentBacklight = px255
 
            else:
 
                board = self.boards[which]
 
                msg = '\x60\x00' + px255.reshape((-1,)).tostring()
 
                # may be stuttering more, and not smoother
 
                #yield threads.deferToThread(self.serialWrite, board, msg)
 
                yield succeed(self.serialWrite(board, msg))
 
        for meth, selectArgs, value in [
 
                ('setStrip', (0,), combined['L']),
 
                ('setStrip', (1,), combined['R']),
 
                ('setUv', (0,), combined['blacklight0']),
 
                ('setUv', (1,), combined['blacklight1']),
 
                ('setRgb', (), combined['rgb']),
 
        ]:
 
            key = (meth, selectArgs)
 
            if self.lastSent.get(key) == value:
 
                continue
 

	
 
    def serialWrite(self, serial, msg):
 
        serial.write(msg)
 
        serial.flush()
 
            log.debug('value changed: %s(%s %s)', meth, selectArgs, value)
 
            getattr(self.board, meth)(*(selectArgs + (value,)))
 
            self.lastSent[key] = value
 
                
 
        yield succeed(None) # there was an attempt at an async send
 
        
 
    def logMessage(self, out):
 
        return str([(w, p.tolist() if isinstance(p, numpy.ndarray) else p) for w,p in out.items()])
0 comments (0 inline, 0 general)