changeset 2388:3cd80b266561

refactor midiFade a lot; try to catch all midi events even better, even if rdfdb runs slow
author drewp@bigasterisk.com
date Mon, 13 May 2024 21:39:53 -0700
parents ab48a401ab02
children 0e90dd50e8c4
files src/light9/midifade/eventqueue.py src/light9/midifade/mididevs.py src/light9/midifade/midifade.py src/light9/midifade/pages.py src/light9/midifade/writeback.py
diffstat 5 files changed, 271 insertions(+), 180 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/light9/midifade/eventqueue.py	Mon May 13 21:39:53 2024 -0700
@@ -0,0 +1,49 @@
+import asyncio
+import logging
+import traceback
+
+log = logging.getLogger()
+
+
+class EventQueue:
+    """midi events come in fast; graph consumes them slower"""
+
+    def __init__(self, MAX_SEND_RATE: float, onMessage) -> None:
+        self.MAX_SEND_RATE = MAX_SEND_RATE
+        self.onMessage = onMessage
+        self.msgs = asyncio.Queue()
+
+    def callbackFor(self, dev):
+        mainThreadLoop = asyncio.get_running_loop()
+
+        def cb(message):
+            # this is running in mido's thread
+            log.info(f'enqueue {message} {"*" * message.dict()["value"]}')
+            mainThreadLoop.call_soon_threadsafe(
+                self.msgs.put_nowait,
+                message.dict() | {'dev': dev},
+            )
+
+        return cb
+
+    async def run(self):
+        while True:
+            recents = [await self.msgs.get()]
+            while not self.msgs.empty():
+                recents.append(self.msgs.get_nowait())
+            try:
+                for msg in reduceToLatestValue(recents):
+                    # log.info(f'handle {msg=}')
+                    await self.onMessage(msg)
+            except Exception:
+                traceback.print_exc()
+                log.warning("error in onMessage- continuing anyway")
+            await asyncio.sleep(1 / self.MAX_SEND_RATE)
+
+
+def reduceToLatestValue(ms: list[dict]) -> list[dict]:
+    merge = {}
+    for m in ms:
+        normal_key = tuple(sorted(dict((k, v) for k, v in m.items() if k != 'value')))
+        merge[normal_key] = m
+    return list(merge.values())
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/light9/midifade/mididevs.py	Mon May 13 21:39:53 2024 -0700
@@ -0,0 +1,40 @@
+import logging
+
+import mido
+from light9.midifade.eventqueue import EventQueue
+from light9.midifade.pages import Pages
+from light9.midifade.writeback import WriteBackFaders
+from rdfdb.syncedgraph.syncedgraph import SyncedGraph
+
+log = logging.getLogger()
+
+_csb=[]
+openPorts = []
+def listenToMidiInputs(q: EventQueue):
+    """put midi events on EventQueue (presumably mido does this in a bg
+    thread)"""
+    
+    for inputName in mido.get_input_names():  # type: ignore
+        if inputName.startswith('Keystation'):
+            dev = "keystation"
+        elif inputName.startswith('BCF2000'):
+            dev = 'bcf2000'
+        elif inputName.startswith('QUNEO'):
+            dev = 'quneo'
+        else:
+            continue
+        log.info(f'listening on input {inputName} {dev=}')
+        cb = q.callbackFor(dev)
+        _csb.append(cb)
+        openPorts.append(mido.open_input(  # type: ignore
+            inputName,  #
+            callback=cb))
+
+
+def connectToMidiOutput(graph: SyncedGraph, pages: Pages, _lastSet: dict[int, int]):
+    for outputName in mido.get_output_names():  # type: ignore
+        if outputName.startswith('BCF2000'):
+            bcf_out = mido.open_output(outputName)  # type: ignore
+            wb = WriteBackFaders(graph, pages, bcf_out, _lastSet)
+            graph.addHandler(wb.update)
+            break
--- a/src/light9/midifade/midifade.py	Mon May 13 21:38:39 2024 -0700
+++ b/src/light9/midifade/midifade.py	Mon May 13 21:39:53 2024 -0700
@@ -1,4 +1,3 @@
-#!bin/python
 """
 Read midi events, write fade levels to graph
 
@@ -6,133 +5,66 @@
     amidi -l
 """
 import asyncio
+from functools import partial
 import logging
-import traceback
-from typing import Dict, List, cast
-from light9.effect.edit import clamp
 
 import mido
-from rdfdb.syncedgraph.syncedgraph import SyncedGraph
-from rdflib import RDF, ConjunctiveGraph, Literal, URIRef
-from rdfdb.syncedgraph.readonly_graph import ReadOnlyConjunctiveGraph
 from light9 import networking
+from light9.effect.edit import clamp
+from light9.midifade.eventqueue import EventQueue
+from light9.midifade.mididevs import connectToMidiOutput, listenToMidiInputs
+from light9.midifade.pages import Pages
 from light9.namespaces import L9
 from light9.newtypes import decimalLiteral
 from light9.run_local import log
 from light9.showconfig import showUri
-
-mido.set_backend('alsa_midi.mido_backend')
-MAX_SEND_RATE = 30
-
-_lastSet = {}  #midictlchannel:value7bit
-
-currentFaders = {}  # midi control channel num : FaderUri
-ctx = URIRef(showUri() + '/fade')
-
-
-def compileCurrents(graph):
-    currentFaders.clear()
-    try:
-        new = getChansToFaders(graph)
-    except ValueError:
-        return  # e.g. empty-graph startup
-    currentFaders.update(new)
-
-
-def getGraphMappingNode(g: ReadOnlyConjunctiveGraph | SyncedGraph) -> URIRef:
-    mapping = g.value(L9['midiControl'], L9['map'])
-    if mapping is None:
-        raise ValueError('no :midiControl :map ?mapping')
-    midiDev = g.value(mapping, L9['midiDev'])
-    ourDev = 'bcf2000'
-    if midiDev != Literal(ourDev):
-        raise NotImplementedError(f'need {mapping} to have :midiDev {ourDev!r}')
-    return mapping
-
-
-def getCurMappedPage(g: SyncedGraph):
-    mapping = getGraphMappingNode(g)
-    return g.value(mapping, L9['outputs'])
+from rdfdb.syncedgraph.syncedgraph import SyncedGraph
+from rdflib import URIRef
 
 
-def setCurMappedPage(g: SyncedGraph, mapping: URIRef, newPage: URIRef):
-    g.patchObject(ctx, mapping, L9.outputs, newPage)
+# inline of patchObject to make it async, not just create_task
+async def asyncPatchObject(graph, context: URIRef, subject, predicate: URIRef, newObject):
+    p = graph.getObjectPatch(context, subject, predicate, newObject)
+    # if not p.isEmpty():
+    #     log.debug("patchObject %r" % p.jsonRepr)
+    await graph.patch(p)
 
 
-def getChansToFaders(g: SyncedGraph) -> Dict[int, URIRef]:
-    fadePage = getCurMappedPage(g)
-    ret = []
-    for f in g.objects(fadePage, L9.fader):
-        columnLit = cast(Literal, g.value(f, L9['column']))
-        col = int(columnLit.toPython())
-        ret.append((col, f))
-
-    ret.sort()
-    ctl_channels = list(range(81, 88 + 1))
-    out = {}
-    for chan, (col, f) in zip(ctl_channels, ret):
-        out[chan] = f
-    return out
-
-
-def changePage(g: SyncedGraph, dp: int):
-    """dp==-1, make the previous page active, etc. Writes to graph"""
-
-    with g.currentState() as current:
-        allPages = sorted(current.subjects(RDF.type, L9.FadePage), key=lambda fp: str(fp))
-        mapping = getGraphMappingNode(current)
-        curPage = current.value(mapping, L9.outputs)
-    if curPage is None:
-        curPage = allPages[0]
-    idx = allPages.index(curPage)
-    newIdx = clamp(idx + dp, 0, len(allPages) - 1)
-    print('change from ', idx, newIdx)
-    newPage = allPages[newIdx]
-    setCurMappedPage(g, mapping, newPage)
-
-
-def writeHwValueToGraph(graph: SyncedGraph, ctx, fader: URIRef, strength: float):
-    log.info(f'setFader(fader={fader}, strength={strength:.03f}')
+async def writeHwValueToGraph(graph: SyncedGraph, ctx, fader: URIRef, strength: float):
+    log.info(f'setFader(fader={fader}, strength={strength:.03f})')
     valueLit = decimalLiteral(round(strength, 3))
     with graph.currentState() as g:
         fadeSet = g.value(fader, L9['setting'])
     if fadeSet is None:
         raise ValueError(f'fader {fader} has no :setting')
-    graph.patchObject(ctx, fadeSet, L9['value'], valueLit)
+    await asyncPatchObject(graph, ctx, fadeSet, L9['value'], valueLit)
 
 
-def changeGrandMaster(graph: SyncedGraph, newValue: float):
+def changeGrandMaster(graph: SyncedGraph, newValue: float, ctx: URIRef):
     graph.patchObject(ctx, L9.grandMaster, L9['value'], decimalLiteral(newValue))
 
 
-def onMessage(graph: SyncedGraph, ctx: URIRef, m: Dict):
+async def onMessage(graph: SyncedGraph, pages: Pages, ctx: URIRef, _lastSet: dict[int, int], m: dict):
     if m['type'] == 'active_sensing':
         return
     if m['type'] == 'control_change':
         if m['dev'] == 'bcf2000' and m['control'] == 91:
-            changePage(graph, -1)
+            pages.changePage(-1)
             return
         if m['dev'] == 'bcf2000' and m['control'] == 92:
-            changePage(graph, 1)
+            pages.changePage(1)
             return
         if m['dev'] == 'bcf2000' and m['control'] == 8:
-            changeGrandMaster(graph, clamp(m['value'] / 127 * 1.5, 0, 1))
+            changeGrandMaster(graph, clamp(m['value'] / 127 * 1.5, 0, 1), ctx)
             return
 
         try:
-            fader = {
-                'quneo': {
-                    44: L9['show/dance2023/fadePage1f0'],
-                    45: L9['show/dance2023/fadePage1f0'],
-                    46: L9['show/dance2023/fadePage1f0'],
-                },
-                'bcf2000': currentFaders,
-            }[m['dev']][m['control']]
+            fader = pages.lookupFader(m['dev'], m['control'])
         except KeyError:
             log.info(f'unknown control {m}')
             return
         try:
-            writeHwValueToGraph(graph, ctx, fader, m['value'] / 127)
+            await writeHwValueToGraph(graph, ctx, fader, m['value'] / 127)
             _lastSet[m['control']] = m['value']
         except ValueError as e:
             log.warning(f'{e!r} - ignoring')
@@ -140,103 +72,26 @@
         log.info(f'unhandled message {m}')
 
 
-def reduceToLatestValue(ms: List[Dict]) -> List[Dict]:
-    merge = {}
-    for m in ms:
-        normal_key = tuple(sorted(dict((k, v) for k, v in m.items() if k != 'value')))
-        merge[normal_key] = m
-    return merge.values()
-
-
-class WriteBackFaders:
-
-    def __init__(self, graph: SyncedGraph, bcf_out, getCurrentValue):
-        self.graph = graph
-        self.bcf_out = bcf_out
-        self.getCurrentValue = getCurrentValue
-
-    def update(self):
-        try:
-            self._update()
-        except ValueError as e:
-            log.warning(repr(e))
-
-    def _update(self):
-        g = self.graph
-        nupdated = 0
-        m = getChansToFaders(g)
-        for midi_ctl_addr, f in m.items():
-            fset = g.value(f, L9.setting)
-            # could split this to a separate handler per fader
-            value = g.value(fset, L9.value).toPython()
-            hwcurrent = self.getCurrentValue(midi_ctl_addr)
-            hwgoal = int(value * 127)
-            print(f'{f} {hwcurrent=} {hwgoal=}')
-            if abs(hwcurrent - hwgoal) > 2:
-                self.sendToBcf(midi_ctl_addr, hwgoal)
-                nupdated += 1
-        log.info(f'wrote to {nupdated} of {len(m)} mapped faders')
-
-    def sendToBcf(self, control, value):
-        _lastSet[control] = value
-        msg = mido.Message('control_change', control=control, value=value)
-        self.bcf_out.send(msg)
-
-
 async def main():
     logging.getLogger('autodepgraphapi').setLevel(logging.INFO)
     logging.getLogger('syncedgraph').setLevel(logging.INFO)
     logging.getLogger('graphedit').setLevel(logging.INFO)
 
-    graph = SyncedGraph(networking.rdfdb.url, "midifade")
-    ctx = URIRef(showUri() + '/fade')
-
-    msgs = asyncio.Queue()
-    loop = asyncio.get_event_loop()
-
-    def onMessageMidoThread(dev, message):
-        loop.call_soon_threadsafe(msgs.put_nowait, message.dict() | {'dev': dev})
+    mido.set_backend('alsa_midi.mido_backend')
 
-    async def reader():
-        while True:
-            recents = [await msgs.get()]
-            while not msgs.empty():
-                recents.append(msgs.get_nowait())
-            try:
-                for msg in reduceToLatestValue(recents):
-                    onMessage(graph, ctx, msg)
-            except Exception as e:
-                traceback.print_exc()
-                log.warning("error in onMessage- continuing anyway")
-            await asyncio.sleep(1 / MAX_SEND_RATE)
+    MAX_SEND_RATE = 30
+
+    _lastSet = {}  #midictlchannel:value7bit
 
-    asyncio.create_task(reader())
-    openPorts = []
-    for inputName in mido.get_input_names():  # type: ignore
-        if inputName.startswith('Keystation'):
-            dev = "keystation"
-        elif inputName.startswith('BCF2000'):
-            dev = 'bcf2000'
-        elif inputName.startswith('QUNEO'):
-            dev = 'quneo'
-        else:
-            continue
-        log.info(f'listening on input {inputName} {dev=}')
-        openPorts.append(mido.open_input(  # type: ignore
-            inputName,  #
-            callback=lambda message, dev=dev: onMessageMidoThread(dev, message)))
+    ctx = URIRef(showUri() + '/fade')
+    graph = SyncedGraph(networking.rdfdb.url, "midifade")
+    pages = Pages(graph, ctx)
+    queue = EventQueue(MAX_SEND_RATE, partial(onMessage, graph, pages, ctx, _lastSet))
+    listenToMidiInputs(queue)
+    connectToMidiOutput(graph, pages, _lastSet)
+    graph.addHandler(pages.compileCurrents)
 
-    graph.addHandler(lambda: compileCurrents(graph))
-
-    for outputName in mido.get_output_names():  # type: ignore
-        if outputName.startswith('BCF2000'):
-            bcf_out = mido.open_output(outputName)  # type: ignore
-            wb = WriteBackFaders(graph, bcf_out, getCurrentValue=lambda f: _lastSet.get(f, 0))
-            graph.addHandler(wb.update)
-            break
-
-    while True:
-        await asyncio.sleep(1)
+    await queue.run()
 
 
 if __name__ == '__main__':
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/light9/midifade/pages.py	Mon May 13 21:39:53 2024 -0700
@@ -0,0 +1,88 @@
+import logging
+from typing import cast
+
+from light9.effect.edit import clamp
+from light9.namespaces import L9
+from rdfdb.syncedgraph.readonly_graph import ReadOnlyConjunctiveGraph
+from rdfdb.syncedgraph.syncedgraph import SyncedGraph
+from rdflib import RDF, Literal, URIRef
+
+log = logging.getLogger()
+
+
+class Pages:
+    """converts between fader numbers and FaderUri, which is a mapping that can
+    be changed by moving between pages"""
+
+    def __init__(self, graph: SyncedGraph, ctx: URIRef):
+        self.graph = graph
+        self.ctx = ctx
+
+        self.currentFaders = {}  # midi control channel num : FaderUri
+
+    def getChansToFaders(self) -> dict[int, URIRef]:
+        fadePage = self.getCurMappedPage()
+        ret = []
+        for f in self.graph.objects(fadePage, L9.fader):
+            columnLit = cast(Literal, self.graph.value(f, L9['column']))
+            col = int(columnLit.toPython())
+            ret.append((col, f))
+
+        ret.sort()
+        ctl_channels = list(range(81, 88 + 1))
+        out = {}
+        for chan, (col, f) in zip(ctl_channels, ret):
+            out[chan] = f
+        return out
+
+    def changePage(self, dp: int):
+        """dp==-1, make the previous page active, etc. Writes to graph"""
+
+        with self.graph.currentState() as current:
+            allPages = sorted(current.subjects(RDF.type, L9.FadePage), key=lambda fp: str(fp))
+            mapping = self.getGraphMappingNode(current)
+            curPage = current.value(mapping, L9.outputs)
+        if curPage is None:
+            curPage = allPages[0]
+        idx = allPages.index(curPage)
+        newIdx = clamp(idx + dp, 0, len(allPages) - 1)
+        log.info(f'change from {idx} {newIdx}')
+        newPage = allPages[newIdx]
+        self.setCurMappedPage(mapping, newPage)
+
+    def getCurMappedPage(self) -> URIRef:
+        mapping = self.getGraphMappingNode(self.graph)
+        ret = self.graph.value(mapping, L9['outputs'])
+        assert ret is not None
+        return ret
+
+    def setCurMappedPage(self, mapping: URIRef, newPage: URIRef):
+        self.graph.patchObject(self.ctx, mapping, L9.outputs, newPage)
+
+    def getGraphMappingNode(self, g: ReadOnlyConjunctiveGraph | SyncedGraph) -> URIRef:
+        mapping = g.value(L9['midiControl'], L9['map'])
+        if mapping is None:
+            raise ValueError('no :midiControl :map ?mapping')
+        midiDev = g.value(mapping, L9['midiDev'])
+        ourDev = 'bcf2000'
+        if midiDev != Literal(ourDev):
+            raise NotImplementedError(f'need {mapping} to have :midiDev {ourDev!r}')
+        return mapping
+
+    def compileCurrents(self):
+        self.currentFaders.clear()
+        try:
+            new = self.getChansToFaders()
+        except ValueError:
+            return  # e.g. empty-graph startup
+        self.currentFaders.update(new)
+
+    def lookupFader(self, dev: str, control: int) -> URIRef:
+        return {
+            'quneo': {
+                44: L9['show/dance2023/fadePage1f0'],
+                45: L9['show/dance2023/fadePage1f0'],
+                46: L9['show/dance2023/fadePage1f0'],
+            },
+            'bcf2000': self.currentFaders,
+        }[dev][control]
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/light9/midifade/writeback.py	Mon May 13 21:39:53 2024 -0700
@@ -0,0 +1,59 @@
+import asyncio
+import logging
+from dataclasses import dataclass
+from typing import cast
+
+import mido
+from light9.midifade.pages import Pages
+from light9.namespaces import L9
+from rdfdb.syncedgraph.syncedgraph import SyncedGraph
+from rdflib import Literal
+from debouncer import DebounceOptions, debounce
+
+log = logging.getLogger()
+
+
+@dataclass
+class WriteBackFaders:
+    graph: SyncedGraph
+    pages: Pages
+    bcf_out: mido.ports.BaseOutput
+    _lastSet: dict[int, int]
+
+    def getCurrentValue(self, f):
+        return self._lastSet.get(f, 0)
+
+    def update(self):
+        try:
+            asyncio.create_task(self._update())
+        except ValueError as e:
+            log.warning(repr(e))
+
+    async def _update(self):
+        # to make this work again:
+        #  - track the goal of all sliders
+        #  - in a debounced handler, sendToBcf
+        return
+        g = self.graph
+        nupdated = 0
+        m = self.pages.getChansToFaders()
+        for midi_ctl_addr, f in m.items():
+            fset = g.value(f, L9.setting)
+            # could split this to a separate handler per fader
+            value = cast(Literal, g.value(fset, L9.value)).toPython()
+            hwcurrent = self.getCurrentValue(midi_ctl_addr)
+            hwgoal = int(value * 127)
+            maxValueDiff = 3
+            # todo: 3 is no good; we should correct after a short time of no
+            # movement
+            if abs(hwcurrent - hwgoal) > maxValueDiff:
+                log.info(f'writing back to {f} {hwcurrent=} {hwgoal=}')
+                self.sendToBcf(midi_ctl_addr, hwgoal)
+                nupdated += 1
+        if nupdated > 0:
+            log.info(f'wrote to {nupdated} of {len(m)} mapped faders')
+
+    def sendToBcf(self, control: int, value: int):
+        self._lastSet[control] = value
+        msg = mido.Message('control_change', control=control, value=value)
+        self.bcf_out.send(msg)