diff service/sba/sba.py @ 808:867f59c83dba

moved from proj/pixel Ignore-this: 97cafcfb96fd33ea70fcd467659ee31a darcs-hash:20110808045021-312f9-be4547f197b49b893d6e873f9c96f30491d0bdc7.gz
author drewp <drewp@bigasterisk.com>
date Sun, 07 Aug 2011 21:50:21 -0700
parents
children be855a111619
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/service/sba/sba.py	Sun Aug 07 21:50:21 2011 -0700
@@ -0,0 +1,185 @@
+from __future__ import division
+import serial, time, jsonlib, sys, cgi, argparse
+import cyclone.web
+from twisted.python import log
+from twisted.internet import reactor
+
+class Sba(object):
+    def __init__(self, port="/dev/ttyACM0"):
+        self.port = port
+        self.reset()
+
+    def reset(self):
+        log.msg("reopening port")
+        self.s = serial.Serial(self.port, baudrate=115200)
+        log.msg(str(self.s.__dict__))
+        self.sendControl()
+
+    def sendControl(self):
+        controlBits = [0, 1,
+                       0, 0, 0,
+                       1, 1, 1, 1, 1, 1, 1, # b correction
+                       0, 0, 0,
+                       1, 1, 1, 1, 1, 1, 1, # g correction
+                       0,
+                       0, 0, # clock mode 00=internal
+                       1, 1, 1, 1, 1, 1, 1, # r correction
+                       ]
+
+        control = reduce(lambda a, b: a<<1 | b,
+                         #controlBits
+                         reversed(controlBits)
+                         )
+        self.send("C" + hex(control)[2:].zfill(8))
+        self.send("E0")
+        
+    def send(self, cmd, getResponse=True):
+        """
+        send a command using the protocol from http://engr.biz/prod/SB-A/
+
+        we will attach the carriage return, cmd is just a string like 'V'
+
+        Returns the response line, like '+OK'
+        """
+        try:
+            self.s.write(cmd + "\r")
+        except OSError:
+            self.reset()
+            
+        if getResponse:
+            return self.s.readline().strip()
+
+    def rgbs(self, rgbs):
+        """
+        send a set of full rgb packets. Values are 0..1023.
+        """
+        t1 = time.time()
+        for (r,g,b) in rgbs:
+            packed = (b & 0x3ff) << 20 | (r & 0x3ff) << 10 | (g & 0x3ff)
+            self.send("D%08x" % packed, getResponse=False)
+
+        self.send("L1", getResponse=False)
+        self.send("L0", getResponse=False)
+        sends = time.time() - t1
+        # doing all the reads together triples the transmission rate
+        t2 = time.time()
+        [self.s.readline() for loop in range(2 + len(rgbs))]
+        reads = time.time() - t2
+
+        log.msg("%.1f ms for sends, %.1f ms for reads" % (
+            1000 * sends, 1000 * reads))
+
+class BriteChain(object):
+    def __init__(self, sba):
+        self.sba = sba
+        self.colors = []
+
+    def setColor(self, pos, color):
+        """color is (r,g,b) 10-bit int. The highest position you ever
+        set is how many channels we'll output"""
+        if len(self.colors) <= pos:
+            self.colors.extend([(0,0,0)]*(pos - len(self.colors) + 1))
+        self.colors[pos] = color
+        self.refresh()
+        
+    def getColor(self, pos):
+        try:
+            return self.colors[pos]
+        except IndexError:
+            return (0,0,0)
+
+    def refresh(self):
+        self.sba.rgbs(self.colors[::-1])
+
+class IndexHandler(cyclone.web.RequestHandler):
+    def get(self):
+        self.set_header("Content-type", "text/html")
+        self.write(open("sba.html").read())
+
+class BriteHandler(cyclone.web.RequestHandler):
+    """
+    /brite/0 is the first shiftbrite on the chain. Put a text/plain
+    color like #ffffff (8-bit) or a application/json value like
+    {"rgb10":[1023,1023,1023]} (for 10-bit). GET (with accept, to pick
+    your format) to learn the current color. 
+
+    /brite/1 affects the second shiftbrite on the chain, etc 
+    """
+    def put(self, pos):
+        d = self.request.body
+        ctype = self.request.headers.get("Content-Type")
+        if ';' in ctype: 
+            ctype = ctype.split(';')[0].strip()
+        if ctype == 'text/plain':
+            color = decode8bitHexColor(d)
+        elif ctype == 'application/json':
+            color = jsonlib.read(d)['rgb10']
+        elif ctype == 'application/x-www-form-urlencoded':
+            color = decode8bitHexColor(cgi.parse_qs(d)['color'][0])
+        else:
+            self.response.set_status(415, "use text/plain, application/json, "
+                                     "or application/x-www-form-urlencoded")
+            return
+
+        self.settings.chain.setColor(int(pos), color)
+        self.set_header("Content-Type", "text/plain")
+        self.write("set %s\n" % pos)
+
+    def post(self, pos):
+        self.put(pos)
+        self.redirect("..")
+
+    def get(self, pos):
+        # todo: content neg
+        color = self.settings.chain.getColor(int(pos))
+        self.set_header("Content-Type", "text/plain")
+        self.write(encode8bitHexColor(color))
+
+def decode8bitHexColor(s):
+    return [4 * int(s.lstrip('#')[i:i+2], 16) for i in [0, 2, 4]]
+def encode8bitHexColor(color):
+    return "#%02X%02X%02X" % (color[0] // 4, color[1] // 4, color[2] // 4)
+
+class Application(cyclone.web.Application):
+    def __init__(self, chain):
+        handlers = [
+            (r"/", IndexHandler),
+            (r"/brite/(\d+)", BriteHandler),
+            ]
+
+        settings = {
+            "static_path": "./static",
+            "template_path": "./template",
+            "chain" : chain,
+        }
+
+        cyclone.web.Application.__init__(self, handlers, **settings)
+
+def main():
+    parser = argparse.ArgumentParser(description='drive sba lights')
+    parser.add_argument('-v', '--verbose', action="store_true", help='logging')
+    args = parser.parse_args()
+
+    try:
+        sba = Sba()
+    except serial.SerialException:
+        sba = Sba("/dev/ttyACM1")
+
+    chain = BriteChain(sba)
+
+    if 0: # todo: stick test patterns like this on some other resource
+        while 1:
+            t1 = time.time()
+            steps = 0
+            for x in range(0, 1024, 5):
+                steps += 1
+                sba.rgbs([(x, x, x)] * 2)
+            print steps / (time.time() - t1)
+
+    if args.verbose:
+        log.startLogging(sys.stdout)
+    reactor.listenTCP(9060, Application(chain))
+    reactor.run()
+
+if __name__ == "__main__":
+    main()