Mercurial > code > home > repos > light9
diff bin/attic/keyboardcomposer @ 2376:4556eebe5d73
topdir reorgs; let pdm have its src/ dir; separate vite area from light9/
author | drewp@bigasterisk.com |
---|---|
date | Sun, 12 May 2024 19:02:10 -0700 |
parents | bin/keyboardcomposer@1241e61fcf74 |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/bin/attic/keyboardcomposer Sun May 12 19:02:10 2024 -0700 @@ -0,0 +1,712 @@ +#!bin/python + +from run_local import log + +from optparse import OptionParser +from typing import Any, Dict, Tuple, List +import cgi, time, logging +import imp +import tkinter.tix as tk + +from louie import dispatcher +from rdflib import URIRef, Literal +from twisted.internet import reactor, tksupport +from twisted.web import resource +import webcolors, colorsys + +from bcf2000 import BCF2000 +from light9 import clientsession +from light9 import showconfig, networking, prof +from light9.Fadable import Fadable +from light9.effect.sequencer import CodeWatcher +from light9.effect.settings import DeviceSettings +from light9.effect.simple_outputs import SimpleOutputs +from light9.namespaces import L9, RDF, RDFS +from light9.subclient import SubClient +from light9.tkdnd import initTkdnd, dragSourceRegister, dropTargetRegister +from light9.uihelpers import toplevelat +from rdfdb.patch import Patch +from rdfdb.syncedgraph import SyncedGraph +import light9.effect.effecteval + +nudge_keys = {'up': list('qwertyui'), 'down': list('asdfghjk')} + + +class DummySliders: + + def valueOut(self, name, value): + pass + + def close(self): + pass + + def reopen(self): + pass + + +class SubScale(tk.Scale, Fadable): + + def __init__(self, master, *args, **kw): + self.scale_var = kw.get('variable') or tk.DoubleVar() + kw.update({ + 'variable': self.scale_var, + 'from': 1., + 'to': 0., + 'showvalue': 0, + 'sliderlength': 15, + 'res': 0.001, + 'width': 40, + 'troughcolor': 'black', + 'bg': 'grey40', + 'highlightthickness': 1, + 'bd': 1, + 'highlightcolor': 'red', + 'highlightbackground': 'black', + 'activebackground': 'red' + }) + tk.Scale.__init__(self, master, *args, **kw) + Fadable.__init__(self, var=self.scale_var, wheel_step=0.05) + self.draw_indicator_colors() + + def draw_indicator_colors(self): + if self.scale_var.get() == 0: + self['troughcolor'] = 'black' + else: + self['troughcolor'] = 'blue' + + +class SubmasterBox(tk.Frame): + """ + this object owns the level of the submaster (the rdf graph is the + real authority) + + This leaks handlers or DoubleVars or something and tries to just + skip the obsolete ones. It'll get slower and bigger over + time. todo: make aa web version. + """ + + def __init__(self, master, graph, sub, session, col, row): + self.graph = graph + self.sub = sub + self.session = session + self.col, self.row = col, row + bg = self.graph.value(sub, L9['color'], default='#000000') + rgb = webcolors.hex_to_rgb(bg) + hsv = colorsys.rgb_to_hsv(*[x / 255 for x in rgb]) + darkBg = webcolors.rgb_to_hex( + tuple([ + int(x * 255) for x in colorsys.hsv_to_rgb(hsv[0], hsv[1], .2) + ])) + tk.Frame.__init__(self, master, bd=1, relief='raised', bg=bg) + self.name = self.graph.label(sub) + self._val = 0.0 + self.slider_var = tk.DoubleVar() + self.pauseTrace = False + self.scale = SubScale(self, variable=self.slider_var, width=20) + self.dead = False + + self.namelabel = tk.Label(self, + font="Arial 9", + bg=darkBg, + fg='white', + pady=0) + self.graph.addHandler(self.updateName) + + self.namelabel.pack(side=tk.TOP) + self.levellabel = tk.Label(self, + textvariable=self.slider_var, + font="Arial 6", + bg='black', + fg='white', + pady=0) + self.levellabel.pack(side=tk.TOP) + self.scale.pack(side=tk.BOTTOM, expand=1, fill=tk.BOTH) + + for w in [self, self.namelabel, self.levellabel]: + dragSourceRegister(w, 'copy', 'text/uri-list', sub) + + self._slider_var_trace = self.slider_var.trace('w', self.slider_changed) + + self.graph.addHandler(self.updateLevelFromGraph) + + # initial position + # stil need? dispatcher.send("send_to_hw", sub=sub.uri, hwCol=col + 1) + + def getVal(self) -> float: + return self._val + + def setVal(self, newVal: float) -> None: + if self.dead: + return + try: + self.scale.set(newVal) + self.levellabel.config(text=str(newVal)) + except Exception: + log.warning("disabling handlers on broken subbox") + self.dead = True + + def cleanup(self): + self.slider_var.trace_vdelete('w', self._slider_var_trace) + + def slider_changed(self, *args): + self._val = self.scale.get() + self.scale.draw_indicator_colors() + + if self.pauseTrace: + return + self.updateGraphWithLevel(self.sub, self.getVal()) + + # needs fixing: plan is to use dispatcher or a method call to tell a hardware-mapping object who changed, and then it can make io if that's a current hw slider + dispatcher.send("send_to_hw", + sub=self.sub, + hwCol=self.col + 1, + boxRow=self.row) + + def updateGraphWithLevel(self, uri, level): + """in our per-session graph, we maintain SubSetting objects like this: + + ?session :subSetting [a :SubSetting; :sub ?s; :level ?l] + """ + # move to syncedgraph patchMapping + + self.graph.patchMapping(context=self.session, + subject=self.session, + predicate=L9['subSetting'], + nodeClass=L9['SubSetting'], + keyPred=L9['sub'], + newKey=uri, + valuePred=L9['level'], + newValue=Literal(level)) + + def updateLevelFromGraph(self): + """read rdf level, write it to subbox.slider_var""" + # move this to syncedgraph readMapping + graph = self.graph + + for setting in graph.objects(self.session, L9['subSetting']): + if graph.value(setting, L9['sub']) == self.sub: + self.pauseTrace = True # don't bounce this update back to server + try: + self.setVal(graph.value(setting, L9['level']).toPython()) + finally: + self.pauseTrace = False + + def updateName(self): + if self.scale is None: + return + + def shortUri(u): + return '.../' + u.split('/')[-1] + + try: + self.namelabel.config( + text=self.graph.label(self.sub) or shortUri(self.sub)) + except Exception: + log.warn("disabling handlers on broken subbox") + self.scale = None + + +class KeyboardComposer(tk.Frame, SubClient): + + def __init__(self, + root: tk.Tk, + graph: SyncedGraph, + session: URIRef, + hw_sliders=True): + tk.Frame.__init__(self, root, bg='black') + SubClient.__init__(self) + self.graph = graph + self.session = session + + self.subbox: Dict[URIRef, SubmasterBox] = {} # sub uri : SubmasterBox + self.slider_table: Dict[Tuple[int, int], SubmasterBox] = { + } # coords : SubmasterBox + self.rows: List[tk.Frame] = [] # this holds Tk Frames for each row + + self.current_row = 0 # should come from session graph + + self.use_hw_sliders = hw_sliders + self.connect_to_hw(hw_sliders) + + self.make_key_hints() + self.make_buttons() + + self.graph.addHandler(self.redraw_sliders) + + self.codeWatcher = CodeWatcher( + onChange=lambda: self.graph.addHandler(self.redraw_sliders)) + + self.send_levels_loop(periodSec=.05) + self.graph.addHandler(self.rowFromGraph) + + def make_buttons(self): + self.buttonframe = tk.Frame(self, bg='black') + self.buttonframe.pack(side=tk.BOTTOM) + + self.sliders_status_var = tk.IntVar() + self.sliders_status_var.set(self.use_hw_sliders) + self.sliders_checkbutton = tk.Checkbutton( + self.buttonframe, + text="Sliders", + variable=self.sliders_status_var, + command=lambda: self.toggle_slider_connectedness(), + bg='black', + fg='white') + self.sliders_checkbutton.pack(side=tk.LEFT) + + self.alltozerobutton = tk.Button(self.buttonframe, + text="All to Zero", + command=self.alltozero, + bg='black', + fg='white') + self.alltozerobutton.pack(side='left') + + self.save_stage_button = tk.Button( + self.buttonframe, + text="Save", + command=lambda: self.save_current_stage(self.sub_name.get()), + bg='black', + fg='white') + self.save_stage_button.pack(side=tk.LEFT) + self.sub_name = tk.Entry(self.buttonframe, bg='black', fg='white') + self.sub_name.pack(side=tk.LEFT) + + def redraw_sliders(self) -> None: + self.draw_sliders() + if len(self.rows): + self.change_row(self.current_row) + self.rows[self.current_row].focus() + + self.stop_frequent_update_time = 0 + + def draw_sliders(self): + for r in self.rows: + r.destroy() + self.rows = [] + for b in list(self.subbox.values()): + b.cleanup() + self.subbox.clear() + self.slider_table.clear() + + self.tk_focusFollowsMouse() + + rowcount = -1 + col = 0 + last_group = None + + withgroups = [] + for effect in self.graph.subjects(RDF.type, L9['Effect']): + withgroups.append((self.graph.value(effect, L9['group']), + self.graph.value(effect, L9['order']), + self.graph.label(effect), effect)) + withgroups.sort() + + log.debug("withgroups %s", withgroups) + + self.effectEval: Dict[URIRef, light9.effect.effecteval.EffectEval] = {} + imp.reload(light9.effect.effecteval) + simpleOutputs = SimpleOutputs(self.graph) + for group, order, sortLabel, effect in withgroups: + if col == 0 or group != last_group: + row = self.make_row(group) + rowcount += 1 + col = 0 + + subbox = SubmasterBox(row, self.graph, effect, self.session, col, + rowcount) + subbox.place(relx=col / 8, rely=0, relwidth=1 / 8, relheight=1) + self.subbox[effect] = self.slider_table[(rowcount, col)] = subbox + + self.setup_key_nudgers(subbox.scale) + + self.effectEval[effect] = light9.effect.effecteval.EffectEval( + self.graph, effect, simpleOutputs) + + col = (col + 1) % 8 + last_group = group + + def toggle_slider_connectedness(self): + self.use_hw_sliders = not self.use_hw_sliders + if self.use_hw_sliders: + self.sliders.reopen() + else: + self.sliders.close() + self.change_row(self.current_row) + self.rows[self.current_row].focus() + + def connect_to_hw(self, hw_sliders): + log.info('connect_to_hw') + if hw_sliders: + try: + self.sliders = Sliders(self) + log.info("connected to sliders") + except IOError as e: + log.info("no hardware sliders %r", e) + self.sliders = DummySliders() + self.use_hw_sliders = False + dispatcher.connect(self.send_to_hw, 'send_to_hw') + else: + self.sliders = DummySliders() + + def make_key_hints(self): + keyhintrow = tk.Frame(self) + + col = 0 + for upkey, downkey in zip(nudge_keys['up'], nudge_keys['down']): + # what a hack! + downkey = downkey.replace('semicolon', ';') + upkey, downkey = (upkey.upper(), downkey.upper()) + + # another what a hack! + keylabel = tk.Label(keyhintrow, + text='%s\n%s' % (upkey, downkey), + width=1, + font=('Arial', 10), + bg='red', + fg='white', + anchor='c') + keylabel.pack(side=tk.LEFT, expand=1, fill=tk.X) + col += 1 + + keyhintrow.pack(fill=tk.X, expand=0) + self.keyhints = keyhintrow + + def setup_key_nudgers(self, tkobject): + for d, keys in list(nudge_keys.items()): + for key in keys: + # lowercase makes full=0 + keysym = "<KeyPress-%s>" % key + tkobject.bind(keysym, + lambda evt, num=keys.index(key), d=d: self. + got_nudger(num, d)) + + # uppercase makes full=1 + keysym = "<KeyPress-%s>" % key.upper() + keysym = keysym.replace('SEMICOLON', 'colon') + tkobject.bind(keysym, + lambda evt, num=keys.index(key), d=d: self. + got_nudger(num, d, full=1)) + + # Row changing: + # Page dn, C-n, and ] do down + # Page up, C-p, and ' do up + for key in '<Prior> <Next> <Control-n> <Control-p> ' \ + '<Key-bracketright> <Key-apostrophe>'.split(): + tkobject.bind(key, self.change_row_cb) + + def change_row_cb(self, event): + diff = 1 + if event.keysym in ('Prior', 'p', 'bracketright'): + diff = -1 + self.change_row(self.current_row + diff) + + def rowFromGraph(self): + self.change_row(int( + self.graph.value(self.session, L9['currentRow'], default=0)), + fromGraph=True) + + def change_row(self, row: int, fromGraph=False) -> None: + old_row = self.current_row + self.current_row = row + self.current_row = max(0, self.current_row) + self.current_row = min(len(self.rows) - 1, self.current_row) + try: + row = self.rows[self.current_row] + except IndexError: + # if we're mid-load, this row might still appear soon. If + # we changed interactively, the user is out of bounds and + # needs to be brought back in + if fromGraph: + return + raise + + self.unhighlight_row(old_row) + self.highlight_row(self.current_row) + self.keyhints.pack_configure(before=row) + + if not fromGraph: + self.graph.patchObject(self.session, self.session, L9['currentRow'], + Literal(self.current_row)) + + for col in range(1, 9): + try: + subbox = self.slider_table[(self.current_row, col - 1)] + self.sliders.valueOut("button-upper%d" % col, True) + except KeyError: + # unfilled bottom row has holes (plus rows with incomplete + # groups + self.sliders.valueOut("button-upper%d" % col, False) + self.sliders.valueOut("slider%d" % col, 0) + continue + self.send_to_hw(sub=subbox.sub, hwCol=col, boxRow=self.current_row) + + def got_nudger(self, number, direction, full=0): + try: + subbox = self.slider_table[(self.current_row, number)] + except KeyError: + return + + if direction == 'up': + if full: + subbox.scale.fade(1) + else: + subbox.scale.increase() + else: + if full: + subbox.scale.fade(0) + else: + subbox.scale.decrease() + + def hw_slider_moved(self, col, value): + value = int(value * 100) / 100 + try: + subbox = self.slider_table[(self.current_row, col)] + except KeyError: + return # no slider assigned at that column + + if hasattr(self, 'pendingHwSet'): + import twisted.internet.error + try: + self.pendingHwSet.cancel() + except twisted.internet.error.AlreadyCalled: + pass + self.pendingHwSet = reactor.callLater(.01, subbox.setVal, value) + + def send_to_hw(self, sub, hwCol, boxRow): + if isinstance(self.sliders, DummySliders): + return + + assert isinstance(sub, URIRef), repr(sub) + + if boxRow != self.current_row: + return + + try: + level = self.get_levels()[sub] + except KeyError: + log.warn("%r not in %r", sub, self.get_levels()) + raise + v = round(127 * level) + chan = "slider%s" % hwCol + + # workaround for some rounding issue, where we receive one + # value and then decide to send back a value that's one step + # lower. -5 is a fallback for having no last value. hopefully + # we won't really see it + if abs(v - self.sliders.lastValue.get(chan, -5)) <= 1: + return + self.sliders.valueOut(chan, v) + + def make_row(self, group): + """group is a URI or None""" + row = tk.Frame(self, bd=2, bg='black') + row.subGroup = group + + def onDrop(ev): + self.change_group(sub=URIRef(ev.data), row=row) + return "link" + + dropTargetRegister(row, + onDrop=onDrop, + typeList=['*'], + hoverStyle=dict(background="#555500")) + + row.pack(expand=1, fill=tk.BOTH) + self.setup_key_nudgers(row) + self.rows.append(row) + return row + + def change_group(self, sub, row): + """update this sub's group, and maybe other sub groups as needed, so + this sub displays in this row""" + group = row.subGroup + self.graph.patchObject(context=self.session, + subject=sub, + predicate=L9['group'], + newObject=group) + + def highlight_row(self, row): + row = self.rows[row] + row['bg'] = 'red' + + def unhighlight_row(self, row): + row = self.rows[row] + row['bg'] = 'black' + + def get_levels(self): + return dict([ + (uri, box.getVal()) for uri, box in list(self.subbox.items()) + ]) + + def get_output_settings(self, _graph=None): + _graph = _graph or self.graph + outputSettings = [] + for setting in _graph.objects(self.session, L9['subSetting']): + effect = _graph.value(setting, L9['sub']) + strength = _graph.value(setting, L9['level']) + if strength: + now = time.time() + out, report = self.effectEval[effect].outputFromEffect( + [(L9['strength'], strength)], + songTime=now, + # should be counting from when you bumped up from 0 + noteTime=now) + outputSettings.append(out) + + return DeviceSettings.fromList(_graph, outputSettings) + + def save_current_stage(self, subname): + log.info("saving current levels as %s", subname) + with self.graph.currentState() as g: + ds = self.get_output_settings(_graph=g) + effect = L9['effect/%s' % subname] + ctx = URIRef(showconfig.showUri() + '/effect/' + subname) + stmts = ds.statements(effect, ctx, effect + '/', set()) + stmts.extend([ + (effect, RDF.type, L9['Effect'], ctx), + (effect, RDFS.label, Literal(subname), ctx), + (effect, L9['publishAttr'], L9['strength'], ctx), + ]) + + self.graph.suggestPrefixes(ctx, {'eff': effect + '/'}) + self.graph.patch(Patch(addQuads=stmts, delQuads=[])) + + self.sub_name.delete(0, tk.END) + + def alltozero(self): + for uri, subbox in list(self.subbox.items()): + if subbox.scale.scale_var.get() != 0: + subbox.scale.fade(value=0.0, length=0) + + +# move to web lib +def postArgGetter(request): + """return a function that takes arg names and returns string + values. Supports args encoded in the url or in postdata. No + support for repeated args.""" + # this is something nevow normally does for me + request.content.seek(0) + fields = cgi.FieldStorage(request.content, + request.received_headers, + environ={'REQUEST_METHOD': 'POST'}) + + def getArg(n): + try: + return request.args[n][0] + except KeyError: + return fields[n].value + + return getArg + + +class LevelServerHttp(resource.Resource): + isLeaf = True + + def __init__(self, name_to_subbox): + self.name_to_subbox = name_to_subbox + + def render_POST(self, request): + arg = postArgGetter(request) + + if request.path == '/fadesub': + # fadesub?subname=scoop&level=0&secs=.2 + self.name_to_subbox[arg('subname')].scale.fade( + float(arg('level')), float(arg('secs'))) + return "set %s to %s" % (arg('subname'), arg('level')) + else: + raise NotImplementedError(repr(request)) + + +class Sliders(BCF2000): + + def __init__(self, kc): + devices = [ + '/dev/snd/midiC3D0', '/dev/snd/midiC2D0', '/dev/snd/midiC1D0' + ] + for dev in devices: + try: + log.info('try sliders on %s', dev) + BCF2000.__init__(self, dev=dev) + except IOError: + if dev is devices[-1]: + raise + else: + break + + self.kc = kc + log.info('found sliders on %s', dev) + + def valueIn(self, name, value): + kc = self.kc + if name.startswith("slider"): + kc.hw_slider_moved(int(name[6:]) - 1, value / 127) + elif name.startswith("button-upper"): + kc.change_row(kc.current_row) + elif name.startswith("button-lower"): + col = int(name[12:]) - 1 + self.valueOut(name, 0) + try: + tkslider = kc.slider_table[(kc.current_row, col)] + except KeyError: + return + + if tkslider.getVal() == 1.0: + tkslider.setVal(0.0) + else: + tkslider.setVal(1.0) + elif name.startswith("button-corner"): + button_num = int(name[13:]) - 1 + if button_num == 1: + diff = -1 + elif button_num == 3: + diff = 1 + else: + return + + kc.change_row(kc.current_row + diff) + self.valueOut(name, 0) + + +def launch(opts: Any, root: tk.Tk, graph: SyncedGraph, session: URIRef): + tl = toplevelat("Keyboard Composer - %s" % opts.session, + existingtoplevel=root, + graph=graph, + session=session) + + kc = KeyboardComposer(tl, graph, session, hw_sliders=not opts.no_sliders) + kc.pack(fill=tk.BOTH, expand=1) + + for helpline in ["Bindings: B3 mute"]: + tk.Label(root, text=helpline, font="Helvetica -12 italic", + anchor='w').pack(side='top', fill='x') + + +if __name__ == "__main__": + parser = OptionParser() + parser.add_option('--no-sliders', + action='store_true', + help="don't attach to hardware sliders") + clientsession.add_option(parser) + parser.add_option('-v', action='store_true', help="log info level") + opts, args = parser.parse_args() + + log.setLevel(logging.DEBUG if opts.v else logging.INFO) + logging.getLogger('colormath').setLevel(logging.INFO) + + graph = SyncedGraph(networking.rdfdb.url, "keyboardcomposer") + + # i think this also needs delayed start (like subcomposer has), to have a valid graph + # before setting any stuff from the ui + + root = tk.Tk() + initTkdnd(root.tk, 'tkdnd/trunk/') + + session = clientsession.getUri('keyboardcomposer', opts) + + graph.initiallySynced.addCallback(lambda _: launch(opts, root, graph, + session)) + + root.protocol('WM_DELETE_WINDOW', reactor.stop) + + tksupport.install(root, ms=20) + prof.run(reactor.run, profile=None)