diff bin/attic/keyboardcomposer @ 2376:4556eebe5d73

topdir reorgs; let pdm have its src/ dir; separate vite area from light9/
author drewp@bigasterisk.com
date Sun, 12 May 2024 19:02:10 -0700
parents bin/keyboardcomposer@1241e61fcf74
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/bin/attic/keyboardcomposer	Sun May 12 19:02:10 2024 -0700
@@ -0,0 +1,712 @@
+#!bin/python
+
+from run_local import log
+
+from optparse import OptionParser
+from typing import Any, Dict, Tuple, List
+import cgi, time, logging
+import imp
+import tkinter.tix as tk
+
+from louie import dispatcher
+from rdflib import URIRef, Literal
+from twisted.internet import reactor, tksupport
+from twisted.web import resource
+import webcolors, colorsys
+
+from bcf2000 import BCF2000
+from light9 import clientsession
+from light9 import showconfig, networking, prof
+from light9.Fadable import Fadable
+from light9.effect.sequencer import CodeWatcher
+from light9.effect.settings import DeviceSettings
+from light9.effect.simple_outputs import SimpleOutputs
+from light9.namespaces import L9, RDF, RDFS
+from light9.subclient import SubClient
+from light9.tkdnd import initTkdnd, dragSourceRegister, dropTargetRegister
+from light9.uihelpers import toplevelat
+from rdfdb.patch import Patch
+from rdfdb.syncedgraph import SyncedGraph
+import light9.effect.effecteval
+
+nudge_keys = {'up': list('qwertyui'), 'down': list('asdfghjk')}
+
+
+class DummySliders:
+
+    def valueOut(self, name, value):
+        pass
+
+    def close(self):
+        pass
+
+    def reopen(self):
+        pass
+
+
+class SubScale(tk.Scale, Fadable):
+
+    def __init__(self, master, *args, **kw):
+        self.scale_var = kw.get('variable') or tk.DoubleVar()
+        kw.update({
+            'variable': self.scale_var,
+            'from': 1.,
+            'to': 0.,
+            'showvalue': 0,
+            'sliderlength': 15,
+            'res': 0.001,
+            'width': 40,
+            'troughcolor': 'black',
+            'bg': 'grey40',
+            'highlightthickness': 1,
+            'bd': 1,
+            'highlightcolor': 'red',
+            'highlightbackground': 'black',
+            'activebackground': 'red'
+        })
+        tk.Scale.__init__(self, master, *args, **kw)
+        Fadable.__init__(self, var=self.scale_var, wheel_step=0.05)
+        self.draw_indicator_colors()
+
+    def draw_indicator_colors(self):
+        if self.scale_var.get() == 0:
+            self['troughcolor'] = 'black'
+        else:
+            self['troughcolor'] = 'blue'
+
+
+class SubmasterBox(tk.Frame):
+    """
+    this object owns the level of the submaster (the rdf graph is the
+    real authority)
+
+    This leaks handlers or DoubleVars or something and tries to just
+    skip the obsolete ones. It'll get slower and bigger over
+    time. todo: make aa web version.
+    """
+
+    def __init__(self, master, graph, sub, session, col, row):
+        self.graph = graph
+        self.sub = sub
+        self.session = session
+        self.col, self.row = col, row
+        bg = self.graph.value(sub, L9['color'], default='#000000')
+        rgb = webcolors.hex_to_rgb(bg)
+        hsv = colorsys.rgb_to_hsv(*[x / 255 for x in rgb])
+        darkBg = webcolors.rgb_to_hex(
+            tuple([
+                int(x * 255) for x in colorsys.hsv_to_rgb(hsv[0], hsv[1], .2)
+            ]))
+        tk.Frame.__init__(self, master, bd=1, relief='raised', bg=bg)
+        self.name = self.graph.label(sub)
+        self._val = 0.0
+        self.slider_var = tk.DoubleVar()
+        self.pauseTrace = False
+        self.scale = SubScale(self, variable=self.slider_var, width=20)
+        self.dead = False
+
+        self.namelabel = tk.Label(self,
+                                  font="Arial 9",
+                                  bg=darkBg,
+                                  fg='white',
+                                  pady=0)
+        self.graph.addHandler(self.updateName)
+
+        self.namelabel.pack(side=tk.TOP)
+        self.levellabel = tk.Label(self,
+                                   textvariable=self.slider_var,
+                                   font="Arial 6",
+                                   bg='black',
+                                   fg='white',
+                                   pady=0)
+        self.levellabel.pack(side=tk.TOP)
+        self.scale.pack(side=tk.BOTTOM, expand=1, fill=tk.BOTH)
+
+        for w in [self, self.namelabel, self.levellabel]:
+            dragSourceRegister(w, 'copy', 'text/uri-list', sub)
+
+        self._slider_var_trace = self.slider_var.trace('w', self.slider_changed)
+
+        self.graph.addHandler(self.updateLevelFromGraph)
+
+        # initial position
+        # stil need? dispatcher.send("send_to_hw", sub=sub.uri, hwCol=col + 1)
+
+    def getVal(self) -> float:
+        return self._val
+
+    def setVal(self, newVal: float) -> None:
+        if self.dead:
+            return
+        try:
+            self.scale.set(newVal)
+            self.levellabel.config(text=str(newVal))
+        except Exception:
+            log.warning("disabling handlers on broken subbox")
+            self.dead = True
+
+    def cleanup(self):
+        self.slider_var.trace_vdelete('w', self._slider_var_trace)
+
+    def slider_changed(self, *args):
+        self._val = self.scale.get()
+        self.scale.draw_indicator_colors()
+
+        if self.pauseTrace:
+            return
+        self.updateGraphWithLevel(self.sub, self.getVal())
+
+        # needs fixing: plan is to use dispatcher or a method call to tell a hardware-mapping object who changed, and then it can make io if that's a current hw slider
+        dispatcher.send("send_to_hw",
+                        sub=self.sub,
+                        hwCol=self.col + 1,
+                        boxRow=self.row)
+
+    def updateGraphWithLevel(self, uri, level):
+        """in our per-session graph, we maintain SubSetting objects like this:
+
+           ?session :subSetting [a :SubSetting; :sub ?s; :level ?l]
+        """
+        # move to syncedgraph patchMapping
+
+        self.graph.patchMapping(context=self.session,
+                                subject=self.session,
+                                predicate=L9['subSetting'],
+                                nodeClass=L9['SubSetting'],
+                                keyPred=L9['sub'],
+                                newKey=uri,
+                                valuePred=L9['level'],
+                                newValue=Literal(level))
+
+    def updateLevelFromGraph(self):
+        """read rdf level, write it to subbox.slider_var"""
+        # move this to syncedgraph readMapping
+        graph = self.graph
+
+        for setting in graph.objects(self.session, L9['subSetting']):
+            if graph.value(setting, L9['sub']) == self.sub:
+                self.pauseTrace = True  # don't bounce this update back to server
+                try:
+                    self.setVal(graph.value(setting, L9['level']).toPython())
+                finally:
+                    self.pauseTrace = False
+
+    def updateName(self):
+        if self.scale is None:
+            return
+
+        def shortUri(u):
+            return '.../' + u.split('/')[-1]
+
+        try:
+            self.namelabel.config(
+                text=self.graph.label(self.sub) or shortUri(self.sub))
+        except Exception:
+            log.warn("disabling handlers on broken subbox")
+            self.scale = None
+
+
+class KeyboardComposer(tk.Frame, SubClient):
+
+    def __init__(self,
+                 root: tk.Tk,
+                 graph: SyncedGraph,
+                 session: URIRef,
+                 hw_sliders=True):
+        tk.Frame.__init__(self, root, bg='black')
+        SubClient.__init__(self)
+        self.graph = graph
+        self.session = session
+
+        self.subbox: Dict[URIRef, SubmasterBox] = {}  # sub uri : SubmasterBox
+        self.slider_table: Dict[Tuple[int, int], SubmasterBox] = {
+        }  # coords : SubmasterBox
+        self.rows: List[tk.Frame] = []  # this holds Tk Frames for each row
+
+        self.current_row = 0  # should come from session graph
+
+        self.use_hw_sliders = hw_sliders
+        self.connect_to_hw(hw_sliders)
+
+        self.make_key_hints()
+        self.make_buttons()
+
+        self.graph.addHandler(self.redraw_sliders)
+
+        self.codeWatcher = CodeWatcher(
+            onChange=lambda: self.graph.addHandler(self.redraw_sliders))
+
+        self.send_levels_loop(periodSec=.05)
+        self.graph.addHandler(self.rowFromGraph)
+
+    def make_buttons(self):
+        self.buttonframe = tk.Frame(self, bg='black')
+        self.buttonframe.pack(side=tk.BOTTOM)
+
+        self.sliders_status_var = tk.IntVar()
+        self.sliders_status_var.set(self.use_hw_sliders)
+        self.sliders_checkbutton = tk.Checkbutton(
+            self.buttonframe,
+            text="Sliders",
+            variable=self.sliders_status_var,
+            command=lambda: self.toggle_slider_connectedness(),
+            bg='black',
+            fg='white')
+        self.sliders_checkbutton.pack(side=tk.LEFT)
+
+        self.alltozerobutton = tk.Button(self.buttonframe,
+                                         text="All to Zero",
+                                         command=self.alltozero,
+                                         bg='black',
+                                         fg='white')
+        self.alltozerobutton.pack(side='left')
+
+        self.save_stage_button = tk.Button(
+            self.buttonframe,
+            text="Save",
+            command=lambda: self.save_current_stage(self.sub_name.get()),
+            bg='black',
+            fg='white')
+        self.save_stage_button.pack(side=tk.LEFT)
+        self.sub_name = tk.Entry(self.buttonframe, bg='black', fg='white')
+        self.sub_name.pack(side=tk.LEFT)
+
+    def redraw_sliders(self) -> None:
+        self.draw_sliders()
+        if len(self.rows):
+            self.change_row(self.current_row)
+            self.rows[self.current_row].focus()
+
+        self.stop_frequent_update_time = 0
+
+    def draw_sliders(self):
+        for r in self.rows:
+            r.destroy()
+        self.rows = []
+        for b in list(self.subbox.values()):
+            b.cleanup()
+        self.subbox.clear()
+        self.slider_table.clear()
+
+        self.tk_focusFollowsMouse()
+
+        rowcount = -1
+        col = 0
+        last_group = None
+
+        withgroups = []
+        for effect in self.graph.subjects(RDF.type, L9['Effect']):
+            withgroups.append((self.graph.value(effect, L9['group']),
+                               self.graph.value(effect, L9['order']),
+                               self.graph.label(effect), effect))
+        withgroups.sort()
+
+        log.debug("withgroups %s", withgroups)
+
+        self.effectEval: Dict[URIRef, light9.effect.effecteval.EffectEval] = {}
+        imp.reload(light9.effect.effecteval)
+        simpleOutputs = SimpleOutputs(self.graph)
+        for group, order, sortLabel, effect in withgroups:
+            if col == 0 or group != last_group:
+                row = self.make_row(group)
+                rowcount += 1
+                col = 0
+
+            subbox = SubmasterBox(row, self.graph, effect, self.session, col,
+                                  rowcount)
+            subbox.place(relx=col / 8, rely=0, relwidth=1 / 8, relheight=1)
+            self.subbox[effect] = self.slider_table[(rowcount, col)] = subbox
+
+            self.setup_key_nudgers(subbox.scale)
+
+            self.effectEval[effect] = light9.effect.effecteval.EffectEval(
+                self.graph, effect, simpleOutputs)
+
+            col = (col + 1) % 8
+            last_group = group
+
+    def toggle_slider_connectedness(self):
+        self.use_hw_sliders = not self.use_hw_sliders
+        if self.use_hw_sliders:
+            self.sliders.reopen()
+        else:
+            self.sliders.close()
+        self.change_row(self.current_row)
+        self.rows[self.current_row].focus()
+
+    def connect_to_hw(self, hw_sliders):
+        log.info('connect_to_hw')
+        if hw_sliders:
+            try:
+                self.sliders = Sliders(self)
+                log.info("connected to sliders")
+            except IOError as e:
+                log.info("no hardware sliders %r", e)
+                self.sliders = DummySliders()
+                self.use_hw_sliders = False
+            dispatcher.connect(self.send_to_hw, 'send_to_hw')
+        else:
+            self.sliders = DummySliders()
+
+    def make_key_hints(self):
+        keyhintrow = tk.Frame(self)
+
+        col = 0
+        for upkey, downkey in zip(nudge_keys['up'], nudge_keys['down']):
+            # what a hack!
+            downkey = downkey.replace('semicolon', ';')
+            upkey, downkey = (upkey.upper(), downkey.upper())
+
+            # another what a hack!
+            keylabel = tk.Label(keyhintrow,
+                                text='%s\n%s' % (upkey, downkey),
+                                width=1,
+                                font=('Arial', 10),
+                                bg='red',
+                                fg='white',
+                                anchor='c')
+            keylabel.pack(side=tk.LEFT, expand=1, fill=tk.X)
+            col += 1
+
+        keyhintrow.pack(fill=tk.X, expand=0)
+        self.keyhints = keyhintrow
+
+    def setup_key_nudgers(self, tkobject):
+        for d, keys in list(nudge_keys.items()):
+            for key in keys:
+                # lowercase makes full=0
+                keysym = "<KeyPress-%s>" % key
+                tkobject.bind(keysym,
+                              lambda evt, num=keys.index(key), d=d: self.
+                              got_nudger(num, d))
+
+                # uppercase makes full=1
+                keysym = "<KeyPress-%s>" % key.upper()
+                keysym = keysym.replace('SEMICOLON', 'colon')
+                tkobject.bind(keysym,
+                              lambda evt, num=keys.index(key), d=d: self.
+                              got_nudger(num, d, full=1))
+
+        # Row changing:
+        # Page dn, C-n, and ] do down
+        # Page up, C-p, and ' do up
+        for key in '<Prior> <Next> <Control-n> <Control-p> ' \
+                   '<Key-bracketright> <Key-apostrophe>'.split():
+            tkobject.bind(key, self.change_row_cb)
+
+    def change_row_cb(self, event):
+        diff = 1
+        if event.keysym in ('Prior', 'p', 'bracketright'):
+            diff = -1
+        self.change_row(self.current_row + diff)
+
+    def rowFromGraph(self):
+        self.change_row(int(
+            self.graph.value(self.session, L9['currentRow'], default=0)),
+                        fromGraph=True)
+
+    def change_row(self, row: int, fromGraph=False) -> None:
+        old_row = self.current_row
+        self.current_row = row
+        self.current_row = max(0, self.current_row)
+        self.current_row = min(len(self.rows) - 1, self.current_row)
+        try:
+            row = self.rows[self.current_row]
+        except IndexError:
+            # if we're mid-load, this row might still appear soon. If
+            # we changed interactively, the user is out of bounds and
+            # needs to be brought back in
+            if fromGraph:
+                return
+            raise
+
+        self.unhighlight_row(old_row)
+        self.highlight_row(self.current_row)
+        self.keyhints.pack_configure(before=row)
+
+        if not fromGraph:
+            self.graph.patchObject(self.session, self.session, L9['currentRow'],
+                                   Literal(self.current_row))
+
+        for col in range(1, 9):
+            try:
+                subbox = self.slider_table[(self.current_row, col - 1)]
+                self.sliders.valueOut("button-upper%d" % col, True)
+            except KeyError:
+                # unfilled bottom row has holes (plus rows with incomplete
+                # groups
+                self.sliders.valueOut("button-upper%d" % col, False)
+                self.sliders.valueOut("slider%d" % col, 0)
+                continue
+            self.send_to_hw(sub=subbox.sub, hwCol=col, boxRow=self.current_row)
+
+    def got_nudger(self, number, direction, full=0):
+        try:
+            subbox = self.slider_table[(self.current_row, number)]
+        except KeyError:
+            return
+
+        if direction == 'up':
+            if full:
+                subbox.scale.fade(1)
+            else:
+                subbox.scale.increase()
+        else:
+            if full:
+                subbox.scale.fade(0)
+            else:
+                subbox.scale.decrease()
+
+    def hw_slider_moved(self, col, value):
+        value = int(value * 100) / 100
+        try:
+            subbox = self.slider_table[(self.current_row, col)]
+        except KeyError:
+            return  # no slider assigned at that column
+
+        if hasattr(self, 'pendingHwSet'):
+            import twisted.internet.error
+            try:
+                self.pendingHwSet.cancel()
+            except twisted.internet.error.AlreadyCalled:
+                pass
+        self.pendingHwSet = reactor.callLater(.01, subbox.setVal, value)
+
+    def send_to_hw(self, sub, hwCol, boxRow):
+        if isinstance(self.sliders, DummySliders):
+            return
+
+        assert isinstance(sub, URIRef), repr(sub)
+
+        if boxRow != self.current_row:
+            return
+
+        try:
+            level = self.get_levels()[sub]
+        except KeyError:
+            log.warn("%r not in %r", sub, self.get_levels())
+            raise
+        v = round(127 * level)
+        chan = "slider%s" % hwCol
+
+        # workaround for some rounding issue, where we receive one
+        # value and then decide to send back a value that's one step
+        # lower.  -5 is a fallback for having no last value.  hopefully
+        # we won't really see it
+        if abs(v - self.sliders.lastValue.get(chan, -5)) <= 1:
+            return
+        self.sliders.valueOut(chan, v)
+
+    def make_row(self, group):
+        """group is a URI or None"""
+        row = tk.Frame(self, bd=2, bg='black')
+        row.subGroup = group
+
+        def onDrop(ev):
+            self.change_group(sub=URIRef(ev.data), row=row)
+            return "link"
+
+        dropTargetRegister(row,
+                           onDrop=onDrop,
+                           typeList=['*'],
+                           hoverStyle=dict(background="#555500"))
+
+        row.pack(expand=1, fill=tk.BOTH)
+        self.setup_key_nudgers(row)
+        self.rows.append(row)
+        return row
+
+    def change_group(self, sub, row):
+        """update this sub's group, and maybe other sub groups as needed, so
+        this sub displays in this row"""
+        group = row.subGroup
+        self.graph.patchObject(context=self.session,
+                               subject=sub,
+                               predicate=L9['group'],
+                               newObject=group)
+
+    def highlight_row(self, row):
+        row = self.rows[row]
+        row['bg'] = 'red'
+
+    def unhighlight_row(self, row):
+        row = self.rows[row]
+        row['bg'] = 'black'
+
+    def get_levels(self):
+        return dict([
+            (uri, box.getVal()) for uri, box in list(self.subbox.items())
+        ])
+
+    def get_output_settings(self, _graph=None):
+        _graph = _graph or self.graph
+        outputSettings = []
+        for setting in _graph.objects(self.session, L9['subSetting']):
+            effect = _graph.value(setting, L9['sub'])
+            strength = _graph.value(setting, L9['level'])
+            if strength:
+                now = time.time()
+                out, report = self.effectEval[effect].outputFromEffect(
+                    [(L9['strength'], strength)],
+                    songTime=now,
+                    # should be counting from when you bumped up from 0
+                    noteTime=now)
+                outputSettings.append(out)
+
+        return DeviceSettings.fromList(_graph, outputSettings)
+
+    def save_current_stage(self, subname):
+        log.info("saving current levels as %s", subname)
+        with self.graph.currentState() as g:
+            ds = self.get_output_settings(_graph=g)
+        effect = L9['effect/%s' % subname]
+        ctx = URIRef(showconfig.showUri() + '/effect/' + subname)
+        stmts = ds.statements(effect, ctx, effect + '/', set())
+        stmts.extend([
+            (effect, RDF.type, L9['Effect'], ctx),
+            (effect, RDFS.label, Literal(subname), ctx),
+            (effect, L9['publishAttr'], L9['strength'], ctx),
+        ])
+
+        self.graph.suggestPrefixes(ctx, {'eff': effect + '/'})
+        self.graph.patch(Patch(addQuads=stmts, delQuads=[]))
+
+        self.sub_name.delete(0, tk.END)
+
+    def alltozero(self):
+        for uri, subbox in list(self.subbox.items()):
+            if subbox.scale.scale_var.get() != 0:
+                subbox.scale.fade(value=0.0, length=0)
+
+
+# move to web lib
+def postArgGetter(request):
+    """return a function that takes arg names and returns string
+    values. Supports args encoded in the url or in postdata. No
+    support for repeated args."""
+    # this is something nevow normally does for me
+    request.content.seek(0)
+    fields = cgi.FieldStorage(request.content,
+                              request.received_headers,
+                              environ={'REQUEST_METHOD': 'POST'})
+
+    def getArg(n):
+        try:
+            return request.args[n][0]
+        except KeyError:
+            return fields[n].value
+
+    return getArg
+
+
+class LevelServerHttp(resource.Resource):
+    isLeaf = True
+
+    def __init__(self, name_to_subbox):
+        self.name_to_subbox = name_to_subbox
+
+    def render_POST(self, request):
+        arg = postArgGetter(request)
+
+        if request.path == '/fadesub':
+            # fadesub?subname=scoop&level=0&secs=.2
+            self.name_to_subbox[arg('subname')].scale.fade(
+                float(arg('level')), float(arg('secs')))
+            return "set %s to %s" % (arg('subname'), arg('level'))
+        else:
+            raise NotImplementedError(repr(request))
+
+
+class Sliders(BCF2000):
+
+    def __init__(self, kc):
+        devices = [
+            '/dev/snd/midiC3D0', '/dev/snd/midiC2D0', '/dev/snd/midiC1D0'
+        ]
+        for dev in devices:
+            try:
+                log.info('try sliders on %s', dev)
+                BCF2000.__init__(self, dev=dev)
+            except IOError:
+                if dev is devices[-1]:
+                    raise
+            else:
+                break
+
+        self.kc = kc
+        log.info('found sliders on %s', dev)
+
+    def valueIn(self, name, value):
+        kc = self.kc
+        if name.startswith("slider"):
+            kc.hw_slider_moved(int(name[6:]) - 1, value / 127)
+        elif name.startswith("button-upper"):
+            kc.change_row(kc.current_row)
+        elif name.startswith("button-lower"):
+            col = int(name[12:]) - 1
+            self.valueOut(name, 0)
+            try:
+                tkslider = kc.slider_table[(kc.current_row, col)]
+            except KeyError:
+                return
+
+            if tkslider.getVal() == 1.0:
+                tkslider.setVal(0.0)
+            else:
+                tkslider.setVal(1.0)
+        elif name.startswith("button-corner"):
+            button_num = int(name[13:]) - 1
+            if button_num == 1:
+                diff = -1
+            elif button_num == 3:
+                diff = 1
+            else:
+                return
+
+            kc.change_row(kc.current_row + diff)
+            self.valueOut(name, 0)
+
+
+def launch(opts: Any, root: tk.Tk, graph: SyncedGraph, session: URIRef):
+    tl = toplevelat("Keyboard Composer - %s" % opts.session,
+                    existingtoplevel=root,
+                    graph=graph,
+                    session=session)
+
+    kc = KeyboardComposer(tl, graph, session, hw_sliders=not opts.no_sliders)
+    kc.pack(fill=tk.BOTH, expand=1)
+
+    for helpline in ["Bindings: B3 mute"]:
+        tk.Label(root, text=helpline, font="Helvetica -12 italic",
+                 anchor='w').pack(side='top', fill='x')
+
+
+if __name__ == "__main__":
+    parser = OptionParser()
+    parser.add_option('--no-sliders',
+                      action='store_true',
+                      help="don't attach to hardware sliders")
+    clientsession.add_option(parser)
+    parser.add_option('-v', action='store_true', help="log info level")
+    opts, args = parser.parse_args()
+
+    log.setLevel(logging.DEBUG if opts.v else logging.INFO)
+    logging.getLogger('colormath').setLevel(logging.INFO)
+
+    graph = SyncedGraph(networking.rdfdb.url, "keyboardcomposer")
+
+    # i think this also needs delayed start (like subcomposer has), to have a valid graph
+    # before setting any stuff from the ui
+
+    root = tk.Tk()
+    initTkdnd(root.tk, 'tkdnd/trunk/')
+
+    session = clientsession.getUri('keyboardcomposer', opts)
+
+    graph.initiallySynced.addCallback(lambda _: launch(opts, root, graph,
+                                                       session))
+
+    root.protocol('WM_DELETE_WINDOW', reactor.stop)
+
+    tksupport.install(root, ms=20)
+    prof.run(reactor.run, profile=None)