import glob, time, logging, ast
from bisect import bisect_left,bisect
import louie as dispatcher
from rdflib import Literal
from bcf2000 import BCF2000
log = logging.getLogger()
# todo: move to config, consolidate with ascoltami, musicPad, etc
introPad = 4
postPad = 4
class Curve(object):
"""curve does not know its name. see Curveset"""
def __init__(self):
self.points = [] # x-sorted list of (x,y)
self._muted = False
def __repr__(self):
return "<%s (%s points)>" % (self.__class__.__name__, len(self.points))
def muted():
doc = "Whether to currently send levels (boolean, obviously)"
def fget(self):
return self._muted
def fset(self, val):
self._muted = val
dispatcher.send('mute changed', sender=self)
return locals()
muted = property(**muted())
def toggleMute(self):
self.muted = not self.muted
def load(self,filename):
for line in file(filename):
x, y = line.split()
self.points.append((float(x), ast.literal_eval(y)))
dispatcher.send("points changed",sender=self)
def set_from_string(self, pts):
self.points[:] = []
vals = pts.split()
pairs = zip(vals[0::2], vals[1::2])
for x, y in pairs:
self.points.append((float(x), ast.literal_eval(y)))
dispatcher.send("points changed",sender=self)
def save(self,filename):
if filename.endswith('-music') or filename.endswith('_music'):
print "not saving music track"
f = file(filename,'w')
for p in self.points:
f.write("%s %r\n" % p)
def eval(self, t, allow_muting=True):
if self.muted and allow_muting:
return 0
i = bisect_left(self.points,(t,None))-1
if i == -1:
return self.points[0][1]
if self.points[i][0]>t:
return self.points[i][1]
if i>=len(self.points)-1:
return self.points[i][1]
p1,p2 = self.points[i],self.points[i+1]
frac = (t-p1[0])/(p2[0]-p1[0])
y = p1[1]+(p2[1]-p1[1])*frac
return y
def insert_pt(self, new_pt):
"""returns index of new point"""
i = bisect(self.points, (new_pt[0],None))
# missing a check that this isn't the same X as the neighbor point
return i
def set_points(self, updates):
for i, pt in updates:
self.points[i] = pt
x = None
for p in self.points:
if p[0] <= x:
raise ValueError("overlapping points")
x = p[0]
def pop_point(self, i):
return self.points.pop(i)
def remove_point(self, pt):
def indices_between(self, x1, x2, beyond=0):
leftidx = max(0, bisect(self.points, (x1,None)) - beyond)
rightidx = min(len(self.points),
bisect(self.points, (x2,None)) + beyond)
return range(leftidx, rightidx)
def points_between(self, x1, x2):
return [self.points[i] for i in self.indices_between(x1,x2)]
def point_before(self, x):
"""(x,y) of the point left of x, or None"""
leftidx = self.index_before(x)
if leftidx is None:
return None
return self.points[leftidx]
def index_before(self, x):
leftidx = bisect(self.points, (x,None)) - 1
if leftidx < 0:
return None
return leftidx
class Markers(Curve):
"""Marker is like a point but the y value is a string"""
def eval(self):
raise NotImplementedError()
def slope(p1,p2):
if p2[0] == p1[0]:
return 0
return (p2[1] - p1[1]) / (p2[0] - p1[0])
class Sliders(BCF2000):
def __init__(self, cb, knobCallback, knobButtonCallback):
self.cb = cb
self.knobCallback = knobCallback
self.knobButtonCallback = knobButtonCallback
def valueIn(self, name, value):
if name.startswith("slider"):
self.cb(int(name[6:]), value / 127)
if name.startswith("knob"):
self.knobCallback(int(name[4:]), value / 127)
if name.startswith("button-knob"):
class Curveset(object):
curves = None # curvename : curve
def __init__(self, sliders=False):
"""sliders=True means support the hardware sliders"""
self.curves = {} # name (str) : Curve
self.curveName = {} # reverse
self.sliderCurve = {} # slider number (1 based) : curve name
self.sliderNum = {} # reverse
if sliders:
self.sliders = Sliders(self.hw_slider_in, self.hw_knob_in,
dispatcher.connect(self.curvesToSliders, "curves to sliders")
dispatcher.connect(self.knobOut, "knob out")
self.lastSliderTime = {} # num : time
self.sliderSuppressOutputUntil = {} # num : time
self.sliderIgnoreInputUntil = {}
self.sliders = None
self.markers = Markers()
def sorter(self, name):
return (not name in ['music', 'smooth_music'], name)
def load(self,basename, skipMusic=False):
"""find all files that look like basename-curvename and add
curves with their contents
This fires 'add_curve' dispatcher events to announce the new curves.
""""Curveset.load %s", basename)
self.markers = Markers()
for filename in sorted(glob.glob("%s-*"%basename), key=self.sorter):
curvename = filename[filename.rfind('-')+1:]
if skipMusic and curvename in ['music', 'smooth_music']:
curvename = curvename.replace('-','_')
self.markers.load("%s.markers" % basename)
except IOError:
print "no marker file found"
def save(self,basename):
"""writes a file for each curve with a name
like basename-curvename"""
for name,cur in self.curves.items():"%s-%s" % (basename,name))"%s.markers" % basename)
def curveNamesInOrder(self):
return sorted(self.curves.keys(), key=self.sorter)
def add_curve(self,name,curve):
if isinstance(name, Literal):
name = str(name)
if name in self.curves:
raise ValueError("can't add a second curve named %r" % name)
self.curves[name] = curve
self.curveName[curve] = name
if self.sliders and name not in ['smooth_music', 'music']:
num = len(self.sliderCurve) + 1
if num <= 8:
self.sliderCurve[num] = name
self.sliderNum[name] = num
num = None
num = None
dispatcher.send("add_curve", slider=num, knobEnabled=num is not None,
sender=self, name=name)
def globalsdict(self):
return self.curves.copy()
def get_time_range(self):
return 0, dispatcher.send("get max time")[0][1]
def new_curve(self, name, renameIfExisting=True):
if isinstance(name, Literal):
name = str(name)
if name=="":
print "no name given"
if not renameIfExisting and name in self.curves:
while name in self.curves:
c = Curve()
s,e = self.get_time_range()
c.points.extend([(s,0), (e,0)])
def hw_slider_in(self, num, value):
curve = self.curves[self.sliderCurve[num]]
except KeyError:
now = time.time()
if now < self.sliderIgnoreInputUntil.get(num):
# don't make points too fast. This is the minimum spacing
# between slider-generated points.
self.sliderIgnoreInputUntil[num] = now + .1
# don't push back on the slider for a little while, since the
# user might be trying to slowly move it. This should be
# bigger than the ignore time above.
self.sliderSuppressOutputUntil[num] = now + .2
dispatcher.send("set key", curve=curve, value=value)
def hw_knob_in(self, num, value):
curve = self.curves[self.sliderCurve[num]]
except KeyError:
dispatcher.send("knob in", curve=curve, value=value)
def hw_knob_button(self, num):
curve = self.curves[self.sliderCurve[num]]
except KeyError:
dispatcher.send("set key", curve=curve)
def curvesToSliders(self, t):
now = time.time()
for num, name in self.sliderCurve.items():
if now < self.sliderSuppressOutputUntil.get(num):
# self.lastSliderTime[num] = now
value = self.curves[name].eval(t)
self.sliders.valueOut("slider%s" % num, value * 127)
def knobOut(self, curve, value):
num = self.sliderNum[self.curveName[curve]]
except KeyError:
self.sliders.valueOut("knob%s" % num, value * 127)