view service/sba/sba.py @ 808:867f59c83dba

moved from proj/pixel Ignore-this: 97cafcfb96fd33ea70fcd467659ee31a darcs-hash:20110808045021-312f9-be4547f197b49b893d6e873f9c96f30491d0bdc7.gz
author drewp <drewp@bigasterisk.com>
date Sun, 07 Aug 2011 21:50:21 -0700
parents
children be855a111619
line wrap: on
line source

from __future__ import division
import serial, time, jsonlib, sys, cgi, argparse
import cyclone.web
from twisted.python import log
from twisted.internet import reactor

class Sba(object):
    def __init__(self, port="/dev/ttyACM0"):
        self.port = port
        self.reset()

    def reset(self):
        log.msg("reopening port")
        self.s = serial.Serial(self.port, baudrate=115200)
        log.msg(str(self.s.__dict__))
        self.sendControl()

    def sendControl(self):
        controlBits = [0, 1,
                       0, 0, 0,
                       1, 1, 1, 1, 1, 1, 1, # b correction
                       0, 0, 0,
                       1, 1, 1, 1, 1, 1, 1, # g correction
                       0,
                       0, 0, # clock mode 00=internal
                       1, 1, 1, 1, 1, 1, 1, # r correction
                       ]

        control = reduce(lambda a, b: a<<1 | b,
                         #controlBits
                         reversed(controlBits)
                         )
        self.send("C" + hex(control)[2:].zfill(8))
        self.send("E0")
        
    def send(self, cmd, getResponse=True):
        """
        send a command using the protocol from http://engr.biz/prod/SB-A/

        we will attach the carriage return, cmd is just a string like 'V'

        Returns the response line, like '+OK'
        """
        try:
            self.s.write(cmd + "\r")
        except OSError:
            self.reset()
            
        if getResponse:
            return self.s.readline().strip()

    def rgbs(self, rgbs):
        """
        send a set of full rgb packets. Values are 0..1023.
        """
        t1 = time.time()
        for (r,g,b) in rgbs:
            packed = (b & 0x3ff) << 20 | (r & 0x3ff) << 10 | (g & 0x3ff)
            self.send("D%08x" % packed, getResponse=False)

        self.send("L1", getResponse=False)
        self.send("L0", getResponse=False)
        sends = time.time() - t1
        # doing all the reads together triples the transmission rate
        t2 = time.time()
        [self.s.readline() for loop in range(2 + len(rgbs))]
        reads = time.time() - t2

        log.msg("%.1f ms for sends, %.1f ms for reads" % (
            1000 * sends, 1000 * reads))

class BriteChain(object):
    def __init__(self, sba):
        self.sba = sba
        self.colors = []

    def setColor(self, pos, color):
        """color is (r,g,b) 10-bit int. The highest position you ever
        set is how many channels we'll output"""
        if len(self.colors) <= pos:
            self.colors.extend([(0,0,0)]*(pos - len(self.colors) + 1))
        self.colors[pos] = color
        self.refresh()
        
    def getColor(self, pos):
        try:
            return self.colors[pos]
        except IndexError:
            return (0,0,0)

    def refresh(self):
        self.sba.rgbs(self.colors[::-1])

class IndexHandler(cyclone.web.RequestHandler):
    def get(self):
        self.set_header("Content-type", "text/html")
        self.write(open("sba.html").read())

class BriteHandler(cyclone.web.RequestHandler):
    """
    /brite/0 is the first shiftbrite on the chain. Put a text/plain
    color like #ffffff (8-bit) or a application/json value like
    {"rgb10":[1023,1023,1023]} (for 10-bit). GET (with accept, to pick
    your format) to learn the current color. 

    /brite/1 affects the second shiftbrite on the chain, etc 
    """
    def put(self, pos):
        d = self.request.body
        ctype = self.request.headers.get("Content-Type")
        if ';' in ctype: 
            ctype = ctype.split(';')[0].strip()
        if ctype == 'text/plain':
            color = decode8bitHexColor(d)
        elif ctype == 'application/json':
            color = jsonlib.read(d)['rgb10']
        elif ctype == 'application/x-www-form-urlencoded':
            color = decode8bitHexColor(cgi.parse_qs(d)['color'][0])
        else:
            self.response.set_status(415, "use text/plain, application/json, "
                                     "or application/x-www-form-urlencoded")
            return

        self.settings.chain.setColor(int(pos), color)
        self.set_header("Content-Type", "text/plain")
        self.write("set %s\n" % pos)

    def post(self, pos):
        self.put(pos)
        self.redirect("..")

    def get(self, pos):
        # todo: content neg
        color = self.settings.chain.getColor(int(pos))
        self.set_header("Content-Type", "text/plain")
        self.write(encode8bitHexColor(color))

def decode8bitHexColor(s):
    return [4 * int(s.lstrip('#')[i:i+2], 16) for i in [0, 2, 4]]
def encode8bitHexColor(color):
    return "#%02X%02X%02X" % (color[0] // 4, color[1] // 4, color[2] // 4)

class Application(cyclone.web.Application):
    def __init__(self, chain):
        handlers = [
            (r"/", IndexHandler),
            (r"/brite/(\d+)", BriteHandler),
            ]

        settings = {
            "static_path": "./static",
            "template_path": "./template",
            "chain" : chain,
        }

        cyclone.web.Application.__init__(self, handlers, **settings)

def main():
    parser = argparse.ArgumentParser(description='drive sba lights')
    parser.add_argument('-v', '--verbose', action="store_true", help='logging')
    args = parser.parse_args()

    try:
        sba = Sba()
    except serial.SerialException:
        sba = Sba("/dev/ttyACM1")

    chain = BriteChain(sba)

    if 0: # todo: stick test patterns like this on some other resource
        while 1:
            t1 = time.time()
            steps = 0
            for x in range(0, 1024, 5):
                steps += 1
                sba.rgbs([(x, x, x)] * 2)
            print steps / (time.time() - t1)

    if args.verbose:
        log.startLogging(sys.stdout)
    reactor.listenTCP(9060, Application(chain))
    reactor.run()

if __name__ == "__main__":
    main()